Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Primary caching 17: timeless range #4852

Merged
merged 5 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/re_data_store/src/store_format.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use re_format::{format_bytes, format_number};
use re_log_types::TimeInt;
use re_types_core::SizeBytes as _;

use crate::{DataStore, IndexedBucket, IndexedTable, PersistentIndexedTable};
Expand Down Expand Up @@ -129,7 +130,7 @@ impl std::fmt::Display for IndexedBucket {

let time_range = {
let time_range = &self.inner.read().time_range;
if time_range.min.as_i64() != i64::MAX && time_range.max.as_i64() != i64::MIN {
if time_range.min != TimeInt::MAX && time_range.max != TimeInt::MIN {
format!(
" - {}: {}",
self.timeline.name(),
Expand Down
4 changes: 2 additions & 2 deletions crates/re_data_store/src/store_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl std::fmt::Debug for RangeQuery {
self.timeline.typ().format_utc(self.range.min),
self.timeline.typ().format_utc(self.range.max),
self.timeline.name(),
if self.range.min == TimeInt::MIN {
if self.range.min <= TimeInt::MIN {
"including"
} else {
"excluding"
Expand Down Expand Up @@ -494,7 +494,7 @@ impl DataStore {
.flatten()
.map(|(time, row_id, cells)| (Some(time), row_id, cells));

if query.range.min == TimeInt::MIN {
if query.range.min <= TimeInt::MIN {
let timeless = self
.timeless_tables
.get(&ent_path_hash)
Expand Down
16 changes: 13 additions & 3 deletions crates/re_data_store/src/store_sanity.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use re_log_types::{DataCellColumn, NumInstances, RowId, TimeRange, VecDequeSortingExt as _};
use re_log_types::{
DataCellColumn, NumInstances, RowId, TimeInt, TimeRange, VecDequeSortingExt as _,
};
use re_types_core::{ComponentName, Loggable, SizeBytes as _};

use crate::{
Expand Down Expand Up @@ -179,8 +181,16 @@ impl IndexedBucket {
let mut times = col_time.clone();
times.sort();

let expected_min = times.front().copied().unwrap_or(i64::MAX).into();
let expected_max = times.back().copied().unwrap_or(i64::MIN).into();
let expected_min = times
.front()
.copied()
.unwrap_or(TimeInt::MAX.as_i64())
.into();
let expected_max = times
.back()
.copied()
.unwrap_or(TimeInt::MIN.as_i64())
.into();
let expected_time_range = TimeRange::new(expected_min, expected_max);

if expected_time_range != *time_range {
Expand Down
2 changes: 1 addition & 1 deletion crates/re_data_store/tests/correctness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ fn range_join_across_single_row_impl(store: &mut DataStore) {
let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let query = re_data_store::RangeQuery::new(
timeline_frame_nr,
re_data_store::TimeRange::new(i64::MIN.into(), i64::MAX.into()),
re_data_store::TimeRange::new(TimeInt::MIN, TimeInt::MAX),
);
let components = [InstanceKey::name(), Position2D::name(), Color::name()];
let dfs = re_data_store::polars_util::range_components(
Expand Down
1 change: 1 addition & 0 deletions crates/re_log_types/src/data_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ impl std::fmt::Display for RowId {

impl RowId {
pub const ZERO: Self = Self(re_tuid::Tuid::ZERO);
pub const MAX: Self = Self(re_tuid::Tuid::MAX);

/// Create a new unique [`RowId`] based on the current time.
#[allow(clippy::new_without_default)]
Expand Down
1 change: 1 addition & 0 deletions crates/re_log_types/src/time_point/time_int.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ impl TimeInt {
// a bit of leeway.
pub const BEGINNING: Self = Self(i64::MIN / 2);

// TODO(#4832): `TimeInt::BEGINNING` vs. `TimeInt::MIN` vs. `Option<TimeInt>`…
pub const MIN: Self = Self(i64::MIN);
pub const MAX: Self = Self(i64::MAX);

Expand Down
96 changes: 85 additions & 11 deletions crates/re_query_cache/src/cache.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
collections::{BTreeMap, VecDeque},
ops::Range,
sync::Arc,
};

Expand Down Expand Up @@ -327,6 +328,8 @@ impl CachesPerArchetype {

re_tracing::profile_function!();

// TODO(cmc): range invalidation

for latest_at_cache in self.latest_at_per_archetype.read().values() {
let mut latest_at_cache = latest_at_cache.write();

Expand Down Expand Up @@ -419,12 +422,6 @@ pub struct CacheBucket {
}

impl CacheBucket {
/// Iterate over the timestamps of the point-of-view components.
#[inline]
pub fn iter_data_times(&self) -> impl Iterator<Item = &(TimeInt, RowId)> {
self.data_times.iter()
}

#[inline]
pub fn time_range(&self) -> Option<TimeRange> {
let first_time = self.data_times.front().map(|(t, _)| *t)?;
Expand All @@ -444,6 +441,25 @@ impl CacheBucket {
self.data_times.binary_search(&(data_time, row_id)).is_ok()
}

/// How many timestamps' worth of data is stored in this bucket?
#[inline]
pub fn num_entries(&self) -> usize {
self.data_times.len()
}

#[inline]
pub fn is_empty(&self) -> bool {
self.num_entries() == 0
}

// ---

/// Iterate over the timestamps of the point-of-view components.
#[inline]
pub fn iter_data_times(&self) -> impl Iterator<Item = &(TimeInt, RowId)> {
self.data_times.iter()
}

/// Iterate over the [`InstanceKey`] batches of the point-of-view components.
#[inline]
pub fn iter_pov_instance_keys(&self) -> impl Iterator<Item = &[InstanceKey]> {
Expand Down Expand Up @@ -474,15 +490,73 @@ impl CacheBucket {
Some(data.iter())
}

/// How many timestamps' worth of data is stored in this bucket?
// ---

/// Returns the index range that corresponds to the specified `time_range`.
///
/// Use the returned range with one of the range iteration methods:
/// - [`Self::range_data_times`]
/// - [`Self::range_pov_instance_keys`]
/// - [`Self::range_component`]
/// - [`Self::range_component_opt`]
///
/// Make sure that the bucket hasn't been modified in-between!
///
/// This is `O(2*log(n))`, so make sure to clone the returned range rather than calling this
/// multiple times.
#[inline]
pub fn num_entries(&self) -> usize {
self.data_times.len()
pub fn entry_range(&self, time_range: TimeRange) -> Range<usize> {
let start_index = self
.data_times
.partition_point(|(data_time, _)| data_time < &time_range.min);
let end_index = self
.data_times
.partition_point(|(data_time, _)| data_time <= &time_range.max);
start_index..end_index
}

/// Range over the timestamps of the point-of-view components.
#[inline]
pub fn is_empty(&self) -> bool {
self.num_entries() == 0
pub fn range_data_times(
&self,
entry_range: Range<usize>,
) -> impl Iterator<Item = &(TimeInt, RowId)> {
self.data_times.range(entry_range)
}

/// Range over the [`InstanceKey`] batches of the point-of-view components.
#[inline]
pub fn range_pov_instance_keys(
&self,
entry_range: Range<usize>,
) -> impl Iterator<Item = &[InstanceKey]> {
self.pov_instance_keys.range(entry_range)
}

/// Range over the batches of the specified non-optional component.
#[inline]
pub fn range_component<C: Component + Send + Sync + 'static>(
&self,
entry_range: Range<usize>,
) -> Option<impl Iterator<Item = &[C]>> {
let data = self
.components
.get(&C::name())
.and_then(|data| data.as_any().downcast_ref::<FlatVecDeque<C>>())?;
Some(data.range(entry_range))
}

/// Range over the batches of the specified optional component.
#[inline]
pub fn range_component_opt<C: Component + Send + Sync + 'static>(
&self,
entry_range: Range<usize>,
) -> Option<impl Iterator<Item = &[Option<C>]>> {
let data = self
.components
.get(&C::name())
.and_then(|data| data.as_any().downcast_ref::<FlatVecDeque<Option<C>>>())?;
Some(data.range(entry_range))
}
}

Expand Down
47 changes: 23 additions & 24 deletions crates/re_query_cache/src/cache_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::BTreeMap;
use re_log_types::{EntityPath, TimeRange, Timeline};
use re_types_core::ComponentName;

use crate::{Caches, LatestAtCache, RangeCache};
use crate::{cache::CacheBucket, Caches, LatestAtCache, RangeCache};

// ---

Expand Down Expand Up @@ -71,6 +71,18 @@ impl Caches {
pub fn stats(detailed_stats: bool) -> CachesStats {
re_tracing::profile_function!();

fn upsert_bucket_stats(
per_component: &mut BTreeMap<ComponentName, CachedComponentStats>,
bucket: &CacheBucket,
) {
for (component_name, data) in &bucket.components {
let stats: &mut CachedComponentStats =
per_component.entry(*component_name).or_default();
stats.total_rows += data.dyn_num_entries() as u64;
stats.total_instances += data.dyn_num_values() as u64;
}
}

Self::with(|caches| {
let latest_at = caches
.0
Expand Down Expand Up @@ -98,22 +110,12 @@ impl Caches {
if let Some(per_component) = per_component.as_mut() {
re_tracing::profile_scope!("detailed");

for bucket in per_data_time.values() {
for (component_name, data) in &bucket.read().components {
let stats: &mut CachedComponentStats =
per_component.entry(*component_name).or_default();
stats.total_rows += data.dyn_num_entries() as u64;
stats.total_instances += data.dyn_num_values() as u64;
}
if let Some(bucket) = &timeless {
upsert_bucket_stats(per_component, bucket);
}

if let Some(bucket) = &timeless {
for (component_name, data) in &bucket.components {
let stats: &mut CachedComponentStats =
per_component.entry(*component_name).or_default();
stats.total_rows += data.dyn_num_entries() as u64;
stats.total_instances += data.dyn_num_values() as u64;
}
for bucket in per_data_time.values() {
upsert_bucket_stats(per_component, &bucket.read());
}
}
}
Expand All @@ -140,27 +142,24 @@ impl Caches {
.values()
.map(|range_cache| {
let RangeCache {
bucket,
per_data_time,
timeless,
total_size_bytes,
} = &*range_cache.read();

let total_rows = bucket.data_times.len() as u64;
let total_rows = per_data_time.data_times.len() as u64;

let mut per_component = detailed_stats.then(BTreeMap::default);
if let Some(per_component) = per_component.as_mut() {
re_tracing::profile_scope!("detailed");

for (component_name, data) in &bucket.components {
let stats: &mut CachedComponentStats =
per_component.entry(*component_name).or_default();
stats.total_rows += data.dyn_num_entries() as u64;
stats.total_instances += data.dyn_num_values() as u64;
}
upsert_bucket_stats(per_component, timeless);
upsert_bucket_stats(per_component, per_data_time);
}

(
key.timeline,
bucket.time_range().unwrap_or(TimeRange::EMPTY),
per_data_time.time_range().unwrap_or(TimeRange::EMPTY),
CachedEntityStats {
total_size_bytes: *total_size_bytes,
total_rows,
Expand Down
1 change: 0 additions & 1 deletion crates/re_query_cache/src/latest_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ macro_rules! impl_query_archetype_latest_at {
f(data);
}


Ok(())
};

Expand Down
Loading
Loading