Skip to content

Commit

Permalink
Primary caching 7: Always expose the data time in query responses (#4711
Browse files Browse the repository at this point in the history
)

_99% grunt work, the only somewhat interesting thing happens in
`query_archetype`_

Our query model always operates with two distinct timestamps: the
timestamp you're querying for (`query_time`) vs. the timestamp of the
data you get back (`data_time`).

This is the result of our latest-at semantics: a query for a point at
time `10` can return a point at time `2`.
This is important to know when caching the data: a query at time `4` and
a query at time `8` that both return the data at time `2` must share the
same single entry or the memory budget would explode.

This PR just updates all existing latest-at APIs so they return the data
time in their response.
This was already the case for range APIs.

Note that in the case of `query_archetype`, which is a compound API that
emits multiple queries, the data time of the final result is the most
recent data time among all of its components.

A follow-up PR will use the data time to deduplicate entries in the
latest-at cache.

---

Part of the primary caching series of PR (index search, joins,
deserialization):
- #4592
- #4593
- #4659
- #4680 
- #4681
- #4698
- #4711
- #4712
- #4721 
- #4726 
- #4773
- #4784
- #4785
- #4793
- #4800
  • Loading branch information
teh-cmc authored Jan 15, 2024
1 parent d8569ee commit 8dd1681
Show file tree
Hide file tree
Showing 20 changed files with 269 additions and 173 deletions.
2 changes: 1 addition & 1 deletion crates/re_data_store/benches/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ fn latest_data_at<const N: usize>(

store
.latest_at(&timeline_query, &ent_path, primary, secondaries)
.map_or_else(|| [(); N].map(|_| None), |(_, cells)| cells)
.map_or_else(|| [(); N].map(|_| None), |(_, _, cells)| cells)
}

fn range_data<const N: usize>(
Expand Down
5 changes: 4 additions & 1 deletion crates/re_data_store/src/polars_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ pub fn latest_component(
let components = &[cluster_key, primary];
let (_, cells) = store
.latest_at(query, ent_path, primary, components)
.unwrap_or((RowId::ZERO, [(); 2].map(|_| None)));
.map_or_else(
|| (RowId::ZERO, [(); 2].map(|_| None)),
|(_, row_id, cells)| (row_id, cells),
);

dataframe_from_cells(&cells)
}
Expand Down
51 changes: 36 additions & 15 deletions crates/re_data_store/src/store_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,35 @@
use re_log_types::{DataCell, DataRow, EntityPath, RowId, TimePoint, Timeline};
use re_log_types::{DataCell, DataRow, EntityPath, RowId, TimeInt, TimePoint, Timeline};

use re_types_core::{Component, ComponentName};

use crate::{DataStore, LatestAtQuery};

// --- Read ---

/// A [`Component`] versioned with a specific [`RowId`].
/// A [`Component`] at a specific _data_ time, versioned with a specific [`RowId`].
///
/// This is not enough to globally, uniquely identify an instance of a component.
/// For that you will need to combine the `InstancePath` that was used to query
/// the versioned component with the returned [`RowId`], therefore creating a
/// `VersionedInstancePath`.
#[derive(Debug, Clone)]
pub struct VersionedComponent<C: Component> {
/// `None` if timeless.
pub data_time: Option<TimeInt>,

pub row_id: RowId,

pub value: C,
}

impl<C: Component> From<(RowId, C)> for VersionedComponent<C> {
impl<C: Component> VersionedComponent<C> {
#[inline]
fn from((row_id, value): (RowId, C)) -> Self {
Self { row_id, value }
pub fn new(time: Option<TimeInt>, row_id: RowId, value: C) -> Self {
Self {
data_time: time,
row_id,
value,
}
}
}

Expand All @@ -35,7 +43,8 @@ impl<C: Component> std::ops::Deref for VersionedComponent<C> {
}

impl DataStore {
/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`].
/// Get the latest value for a given [`re_types_core::Component`], as well as the associated
/// _data_ time and [`RowId`].
///
/// This assumes that the row we get from the store only contains a single instance for this
/// component; it will generate a log message of `level` otherwise.
Expand All @@ -51,7 +60,8 @@ impl DataStore {
) -> Option<VersionedComponent<C>> {
re_tracing::profile_function!();

let (row_id, cells) = self.latest_at(query, entity_path, C::name(), &[C::name()])?;
let (data_time, row_id, cells) =
self.latest_at(query, entity_path, C::name(), &[C::name()])?;
let cell = cells.first()?.as_ref()?;

cell.try_to_native_mono::<C>()
Expand Down Expand Up @@ -93,10 +103,11 @@ impl DataStore {
err
})
.ok()?
.map(|c| (row_id, c).into())
.map(|c| VersionedComponent::new(data_time, row_id, c))
}

/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`].
/// Get the latest value for a given [`re_types_core::Component`], as well as the associated
/// _data_ time and [`RowId`].
///
/// This assumes that the row we get from the store only contains a single instance for this
/// component; it will log a warning otherwise.
Expand All @@ -113,7 +124,8 @@ impl DataStore {
self.query_latest_component_with_log_level(entity_path, query, re_log::Level::Warn)
}

/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`].
/// Get the latest value for a given [`re_types_core::Component`], as well as the associated
/// _data_ time and [`RowId`].
///
/// This assumes that the row we get from the store only contains a single instance for this
/// component; it will return None and log a debug message otherwise.
Expand All @@ -140,15 +152,16 @@ impl DataStore {

let mut cur_path = Some(entity_path.clone());
while let Some(path) = cur_path {
if let Some(c) = self.query_latest_component::<C>(&path, query) {
return Some((path, c));
if let Some(vc) = self.query_latest_component::<C>(&path, query) {
return Some((path, vc));
}
cur_path = path.parent();
}
None
}

/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`], assuming it is timeless.
/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`],
/// assuming it is timeless.
///
/// This assumes that the row we get from the store only contains a single instance for this
/// component; it will log a warning otherwise.
Expand All @@ -163,10 +176,14 @@ impl DataStore {
re_tracing::profile_function!();

let query = LatestAtQuery::latest(Timeline::default());
self.query_latest_component(entity_path, &query)
self.query_latest_component(entity_path, &query).map(|vc| {
debug_assert!(vc.data_time.is_none());
vc
})
}

/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`], assuming it is timeless.
/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`],
/// assuming it is timeless.
///
/// This assumes that the row we get from the store only contains a single instance for this
/// component; it will return None and log a debug message otherwise.
Expand All @@ -182,6 +199,10 @@ impl DataStore {

let query = LatestAtQuery::latest(Timeline::default());
self.query_latest_component_quiet(entity_path, &query)
.map(|vc| {
debug_assert!(vc.data_time.is_none());
vc
})
}
}

Expand Down
85 changes: 52 additions & 33 deletions crates/re_data_store/src/store_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,9 @@ impl DataStore {
/// Queries the datastore for the cells of the specified `components`, as seen from the point
/// of view of the so-called `primary` component.
///
/// Returns an array of [`DataCell`]s on success, or `None` otherwise.
/// Success is defined by one thing and thing only: whether a cell could be found for the
/// Returns an array of [`DataCell`]s (as well as the associated _data_ time and `RowId`) on
/// success.
/// Success is defined by one thing and one thing only: whether a cell could be found for the
/// `primary` component.
/// The presence or absence of secondary components has no effect on the success criteria.
///
Expand Down Expand Up @@ -238,7 +239,10 @@ impl DataStore {
/// let components = &[cluster_key, primary];
/// let (_, cells) = store
/// .latest_at(&query, ent_path, primary, components)
/// .unwrap_or((RowId::ZERO, [(); 2].map(|_| None)));
/// .map_or_else(
/// || (RowId::ZERO, [(); 2].map(|_| None)),
/// |(_, row_id, cells)| (row_id, cells),
/// );
///
/// let series: Result<Vec<_>, _> = cells
/// .iter()
Expand Down Expand Up @@ -266,7 +270,7 @@ impl DataStore {
ent_path: &EntityPath,
primary: ComponentName,
components: &[ComponentName; N],
) -> Option<(RowId, [Option<DataCell>; N])> {
) -> Option<(Option<TimeInt>, RowId, [Option<DataCell>; N])> {
// TODO(cmc): kind & query_id need to somehow propagate through the span system.
self.query_id.fetch_add(1, Ordering::Relaxed);

Expand All @@ -282,7 +286,7 @@ impl DataStore {
"query started…"
);

let cells = self
let results = self
.tables
.get(&(ent_path_hash, query.timeline))
.and_then(|table| {
Expand All @@ -301,11 +305,11 @@ impl DataStore {

// If we've found everything we were looking for in the temporal table, then we can
// return the results immediately.
if cells
if results
.as_ref()
.map_or(false, |(_, cells)| cells.iter().all(Option::is_some))
.map_or(false, |(_, _, cells)| cells.iter().all(Option::is_some))
{
return cells;
return results.map(|(data_time, row_id, cells)| (Some(data_time), row_id, cells));
}

let cells_timeless = self.timeless_tables.get(&ent_path_hash).and_then(|table| {
Expand All @@ -324,21 +328,28 @@ impl DataStore {
});

// Otherwise, let's see what's in the timeless table, and then..:
match (cells, cells_timeless) {
match (results, cells_timeless) {
// nothing in the timeless table: return those partial cells we got.
(Some(cells), None) => return Some(cells),
(results @ Some(_), None) => {
return results.map(|(data_time, row_id, cells)| (Some(data_time), row_id, cells))
}

// no temporal cells, but some timeless ones: return those as-is.
(None, Some(cells_timeless)) => return Some(cells_timeless),
(None, results @ Some(_)) => {
return results.map(|(row_id, cells)| (None, row_id, cells))
}

// we have both temporal & timeless cells: let's merge the two when it makes sense
// and return the end result.
(Some((row_id, mut cells)), Some((_, cells_timeless))) => {
(Some((data_time, row_id, mut cells)), Some((_, cells_timeless))) => {
for (i, row_idx) in cells_timeless.into_iter().enumerate() {
if cells[i].is_none() {
cells[i] = row_idx;
}
}
return Some((row_id, cells));
return Some((Some(data_time), row_id, cells));
}

// no cells at all.
(None, None) => {}
}
Expand Down Expand Up @@ -428,7 +439,10 @@ impl DataStore {
/// let query = LatestAtQuery::new(query.timeline, latest_time);
/// let (_, cells) = store
/// .latest_at(&query, ent_path, primary, &components)
/// .unwrap_or((RowId::ZERO, [(); 2].map(|_| None)));
/// .map_or_else(
/// || (RowId::ZERO, [(); 2].map(|_| None)),
/// |(_, row_id, cells)| (row_id, cells),
/// );
/// dataframe_from_cells(cells)
/// };
///
Expand Down Expand Up @@ -519,14 +533,14 @@ impl IndexedTable {
/// Queries the table for the cells of the specified `components`, as seen from the point
/// of view of the so-called `primary` component.
///
/// Returns an array of [`DataCell`]s on success, or `None` iff no cell could be found for
/// the `primary` component.
/// Returns an array of [`DataCell`]s (as well as the associated _data_ time and `RowId`) on
/// success, or `None` iff no cell could be found for the `primary` component.
pub fn latest_at<const N: usize>(
&self,
time: TimeInt,
query_time: TimeInt,
primary: ComponentName,
components: &[ComponentName; N],
) -> Option<(RowId, [Option<DataCell>; N])> {
) -> Option<(TimeInt, RowId, [Option<DataCell>; N])> {
// Early-exit if this entire table is unaware of this component.
if !self.all_components.contains(&primary) {
return None;
Expand All @@ -542,22 +556,22 @@ impl IndexedTable {
// multiple indexed buckets within the same table!

let buckets = self
.range_buckets_rev(..=time)
.range_buckets_rev(..=query_time)
.map(|(_, bucket)| bucket)
.enumerate();
for (attempt, bucket) in buckets {
trace!(
kind = "latest_at",
timeline = %timeline.name(),
time = timeline.typ().format_utc(time),
time = timeline.typ().format_utc(query_time),
%primary,
?components,
attempt,
bucket_time_range = timeline.typ().format_range_utc(bucket.inner.read().time_range),
"found candidate bucket"
);
if let cells @ Some(_) = bucket.latest_at(time, primary, components) {
return cells; // found at least the primary component!
if let ret @ Some(_) = bucket.latest_at(query_time, primary, components) {
return ret; // found at least the primary component!
}
}

Expand Down Expand Up @@ -717,14 +731,14 @@ impl IndexedBucket {
/// Queries the bucket for the cells of the specified `components`, as seen from the point
/// of view of the so-called `primary` component.
///
/// Returns an array of [`DataCell`]s on success, or `None` iff no cell could be found for
/// the `primary` component.
/// Returns an array of [`DataCell`]s (as well as the associated _data_ time and `RowId`) on
/// success, or `None` iff no cell could be found for the `primary` component.
pub fn latest_at<const N: usize>(
&self,
time: TimeInt,
query_time: TimeInt,
primary: ComponentName,
components: &[ComponentName; N],
) -> Option<(RowId, [Option<DataCell>; N])> {
) -> Option<(TimeInt, RowId, [Option<DataCell>; N])> {
self.sort_indices_if_needed();

let IndexedBucketInner {
Expand All @@ -748,11 +762,12 @@ impl IndexedBucket {
%primary,
?components,
timeline = %self.timeline.name(),
time = self.timeline.typ().format_utc(time),
query_time = self.timeline.typ().format_utc(query_time),
"searching for primary & secondary cells…"
);

let time_row_nr = col_time.partition_point(|t| *t <= time.as_i64()) as i64;
let time_row_nr =
col_time.partition_point(|data_time| *data_time <= query_time.as_i64()) as i64;

// The partition point is always _beyond_ the index that we're looking for.
// A partition point of 0 thus means that we're trying to query for data that lives
Expand All @@ -769,7 +784,7 @@ impl IndexedBucket {
%primary,
?components,
timeline = %self.timeline.name(),
time = self.timeline.typ().format_utc(time),
query_time = self.timeline.typ().format_utc(query_time),
%primary_row_nr,
"found primary row number",
);
Expand All @@ -783,7 +798,7 @@ impl IndexedBucket {
%primary,
?components,
timeline = %self.timeline.name(),
time = self.timeline.typ().format_utc(time),
query_time = self.timeline.typ().format_utc(query_time),
%primary_row_nr,
"no secondary row number found",
);
Expand All @@ -797,7 +812,7 @@ impl IndexedBucket {
%primary,
?components,
timeline = %self.timeline.name(),
time = self.timeline.typ().format_utc(time),
query_time = self.timeline.typ().format_utc(query_time),
%primary_row_nr, %secondary_row_nr,
"found secondary row number",
);
Expand All @@ -812,7 +827,7 @@ impl IndexedBucket {
%primary,
%component,
timeline = %self.timeline.name(),
time = self.timeline.typ().format_utc(time),
query_time = self.timeline.typ().format_utc(query_time),
%primary_row_nr, %secondary_row_nr,
"found cell",
);
Expand All @@ -821,7 +836,11 @@ impl IndexedBucket {
}
}

Some((col_row_id[secondary_row_nr as usize], cells))
Some((
col_time[secondary_row_nr as usize].into(),
col_row_id[secondary_row_nr as usize],
cells,
))
}

/// Iterates the bucket in order to return the cells of the specified `components`,
Expand Down
Loading

0 comments on commit 8dd1681

Please sign in to comment.