Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Primary caching 7: Always expose the data time in query responses #4711

Merged
merged 5 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/re_data_store/benches/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ fn latest_data_at<const N: usize>(

store
.latest_at(&timeline_query, &ent_path, primary, secondaries)
.map_or_else(|| [(); N].map(|_| None), |(_, cells)| cells)
.map_or_else(|| [(); N].map(|_| None), |(_, _, cells)| cells)
}

fn range_data<const N: usize>(
Expand Down
5 changes: 4 additions & 1 deletion crates/re_data_store/src/polars_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ pub fn latest_component(
let components = &[cluster_key, primary];
let (_, cells) = store
.latest_at(query, ent_path, primary, components)
.unwrap_or((RowId::ZERO, [(); 2].map(|_| None)));
.map_or_else(
|| (RowId::ZERO, [(); 2].map(|_| None)),
|(_, row_id, cells)| (row_id, cells),
);

dataframe_from_cells(&cells)
}
Expand Down
51 changes: 36 additions & 15 deletions crates/re_data_store/src/store_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,35 @@
use re_log_types::{DataCell, DataRow, EntityPath, RowId, TimePoint, Timeline};
use re_log_types::{DataCell, DataRow, EntityPath, RowId, TimeInt, TimePoint, Timeline};

use re_types_core::{Component, ComponentName};

use crate::{DataStore, LatestAtQuery};

// --- Read ---

/// A [`Component`] versioned with a specific [`RowId`].
/// A [`Component`] at a specific _data_ time, versioned with a specific [`RowId`].
///
/// This is not enough to globally, uniquely identify an instance of a component.
/// For that you will need to combine the `InstancePath` that was used to query
/// the versioned component with the returned [`RowId`], therefore creating a
/// `VersionedInstancePath`.
#[derive(Debug, Clone)]
pub struct VersionedComponent<C: Component> {
/// `None` if timeless.
pub data_time: Option<TimeInt>,

pub row_id: RowId,

pub value: C,
}

impl<C: Component> From<(RowId, C)> for VersionedComponent<C> {
impl<C: Component> VersionedComponent<C> {
#[inline]
fn from((row_id, value): (RowId, C)) -> Self {
Self { row_id, value }
pub fn new(time: Option<TimeInt>, row_id: RowId, value: C) -> Self {
Self {
data_time: time,
row_id,
value,
}
}
}

Expand All @@ -35,7 +43,8 @@ impl<C: Component> std::ops::Deref for VersionedComponent<C> {
}

impl DataStore {
/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`].
/// Get the latest value for a given [`re_types_core::Component`], as well as the associated
/// _data_ time and [`RowId`].
///
/// This assumes that the row we get from the store only contains a single instance for this
/// component; it will generate a log message of `level` otherwise.
Expand All @@ -51,7 +60,8 @@ impl DataStore {
) -> Option<VersionedComponent<C>> {
re_tracing::profile_function!();

let (row_id, cells) = self.latest_at(query, entity_path, C::name(), &[C::name()])?;
let (data_time, row_id, cells) =
self.latest_at(query, entity_path, C::name(), &[C::name()])?;
let cell = cells.first()?.as_ref()?;

cell.try_to_native_mono::<C>()
Expand Down Expand Up @@ -93,10 +103,11 @@ impl DataStore {
err
})
.ok()?
.map(|c| (row_id, c).into())
.map(|c| VersionedComponent::new(data_time, row_id, c))
}

/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`].
/// Get the latest value for a given [`re_types_core::Component`], as well as the associated
/// _data_ time and [`RowId`].
///
/// This assumes that the row we get from the store only contains a single instance for this
/// component; it will log a warning otherwise.
Expand All @@ -113,7 +124,8 @@ impl DataStore {
self.query_latest_component_with_log_level(entity_path, query, re_log::Level::Warn)
}

/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`].
/// Get the latest value for a given [`re_types_core::Component`], as well as the associated
/// _data_ time and [`RowId`].
///
/// This assumes that the row we get from the store only contains a single instance for this
/// component; it will return None and log a debug message otherwise.
Expand All @@ -140,15 +152,16 @@ impl DataStore {

let mut cur_path = Some(entity_path.clone());
while let Some(path) = cur_path {
if let Some(c) = self.query_latest_component::<C>(&path, query) {
return Some((path, c));
if let Some(vc) = self.query_latest_component::<C>(&path, query) {
return Some((path, vc));
}
cur_path = path.parent();
}
None
}

/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`], assuming it is timeless.
/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`],
/// assuming it is timeless.
///
/// This assumes that the row we get from the store only contains a single instance for this
/// component; it will log a warning otherwise.
Expand All @@ -163,10 +176,14 @@ impl DataStore {
re_tracing::profile_function!();

let query = LatestAtQuery::latest(Timeline::default());
self.query_latest_component(entity_path, &query)
self.query_latest_component(entity_path, &query).map(|vc| {
debug_assert!(vc.data_time.is_none());
vc
})
}

/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`], assuming it is timeless.
/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`],
/// assuming it is timeless.
///
/// This assumes that the row we get from the store only contains a single instance for this
/// component; it will return None and log a debug message otherwise.
Expand All @@ -182,6 +199,10 @@ impl DataStore {

let query = LatestAtQuery::latest(Timeline::default());
self.query_latest_component_quiet(entity_path, &query)
.map(|vc| {
debug_assert!(vc.data_time.is_none());
vc
})
}
}

Expand Down
85 changes: 52 additions & 33 deletions crates/re_data_store/src/store_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,9 @@ impl DataStore {
/// Queries the datastore for the cells of the specified `components`, as seen from the point
/// of view of the so-called `primary` component.
///
/// Returns an array of [`DataCell`]s on success, or `None` otherwise.
/// Success is defined by one thing and thing only: whether a cell could be found for the
/// Returns an array of [`DataCell`]s (as well as the associated _data_ time and `RowId`) on
/// success.
/// Success is defined by one thing and one thing only: whether a cell could be found for the
/// `primary` component.
/// The presence or absence of secondary components has no effect on the success criteria.
///
Expand Down Expand Up @@ -238,7 +239,10 @@ impl DataStore {
/// let components = &[cluster_key, primary];
/// let (_, cells) = store
/// .latest_at(&query, ent_path, primary, components)
/// .unwrap_or((RowId::ZERO, [(); 2].map(|_| None)));
/// .map_or_else(
/// || (RowId::ZERO, [(); 2].map(|_| None)),
/// |(_, row_id, cells)| (row_id, cells),
/// );
///
/// let series: Result<Vec<_>, _> = cells
/// .iter()
Expand Down Expand Up @@ -266,7 +270,7 @@ impl DataStore {
ent_path: &EntityPath,
primary: ComponentName,
components: &[ComponentName; N],
) -> Option<(RowId, [Option<DataCell>; N])> {
) -> Option<(Option<TimeInt>, RowId, [Option<DataCell>; N])> {
// TODO(cmc): kind & query_id need to somehow propagate through the span system.
self.query_id.fetch_add(1, Ordering::Relaxed);

Expand All @@ -282,7 +286,7 @@ impl DataStore {
"query started…"
);

let cells = self
let results = self
.tables
.get(&(ent_path_hash, query.timeline))
.and_then(|table| {
Expand All @@ -301,11 +305,11 @@ impl DataStore {

// If we've found everything we were looking for in the temporal table, then we can
// return the results immediately.
if cells
if results
.as_ref()
.map_or(false, |(_, cells)| cells.iter().all(Option::is_some))
.map_or(false, |(_, _, cells)| cells.iter().all(Option::is_some))
{
return cells;
return results.map(|(data_time, row_id, cells)| (Some(data_time), row_id, cells));
}

let cells_timeless = self.timeless_tables.get(&ent_path_hash).and_then(|table| {
Expand All @@ -324,21 +328,28 @@ impl DataStore {
});

// Otherwise, let's see what's in the timeless table, and then..:
match (cells, cells_timeless) {
match (results, cells_timeless) {
// nothing in the timeless table: return those partial cells we got.
(Some(cells), None) => return Some(cells),
(results @ Some(_), None) => {
return results.map(|(data_time, row_id, cells)| (Some(data_time), row_id, cells))
}

// no temporal cells, but some timeless ones: return those as-is.
(None, Some(cells_timeless)) => return Some(cells_timeless),
(None, results @ Some(_)) => {
return results.map(|(row_id, cells)| (None, row_id, cells))
}

// we have both temporal & timeless cells: let's merge the two when it makes sense
// and return the end result.
(Some((row_id, mut cells)), Some((_, cells_timeless))) => {
(Some((data_time, row_id, mut cells)), Some((_, cells_timeless))) => {
for (i, row_idx) in cells_timeless.into_iter().enumerate() {
if cells[i].is_none() {
cells[i] = row_idx;
}
}
return Some((row_id, cells));
return Some((Some(data_time), row_id, cells));
}

// no cells at all.
(None, None) => {}
}
Expand Down Expand Up @@ -428,7 +439,10 @@ impl DataStore {
/// let query = LatestAtQuery::new(query.timeline, latest_time);
/// let (_, cells) = store
/// .latest_at(&query, ent_path, primary, &components)
/// .unwrap_or((RowId::ZERO, [(); 2].map(|_| None)));
/// .map_or_else(
/// || (RowId::ZERO, [(); 2].map(|_| None)),
/// |(_, row_id, cells)| (row_id, cells),
/// );
/// dataframe_from_cells(cells)
/// };
///
Expand Down Expand Up @@ -519,14 +533,14 @@ impl IndexedTable {
/// Queries the table for the cells of the specified `components`, as seen from the point
/// of view of the so-called `primary` component.
///
/// Returns an array of [`DataCell`]s on success, or `None` iff no cell could be found for
/// the `primary` component.
/// Returns an array of [`DataCell`]s (as well as the associated _data_ time and `RowId`) on
/// success, or `None` iff no cell could be found for the `primary` component.
pub fn latest_at<const N: usize>(
&self,
time: TimeInt,
query_time: TimeInt,
primary: ComponentName,
components: &[ComponentName; N],
) -> Option<(RowId, [Option<DataCell>; N])> {
) -> Option<(TimeInt, RowId, [Option<DataCell>; N])> {
// Early-exit if this entire table is unaware of this component.
if !self.all_components.contains(&primary) {
return None;
Expand All @@ -542,22 +556,22 @@ impl IndexedTable {
// multiple indexed buckets within the same table!

let buckets = self
.range_buckets_rev(..=time)
.range_buckets_rev(..=query_time)
.map(|(_, bucket)| bucket)
.enumerate();
for (attempt, bucket) in buckets {
trace!(
kind = "latest_at",
timeline = %timeline.name(),
time = timeline.typ().format_utc(time),
time = timeline.typ().format_utc(query_time),
%primary,
?components,
attempt,
bucket_time_range = timeline.typ().format_range_utc(bucket.inner.read().time_range),
"found candidate bucket"
);
if let cells @ Some(_) = bucket.latest_at(time, primary, components) {
return cells; // found at least the primary component!
if let ret @ Some(_) = bucket.latest_at(query_time, primary, components) {
return ret; // found at least the primary component!
}
}

Expand Down Expand Up @@ -717,14 +731,14 @@ impl IndexedBucket {
/// Queries the bucket for the cells of the specified `components`, as seen from the point
/// of view of the so-called `primary` component.
///
/// Returns an array of [`DataCell`]s on success, or `None` iff no cell could be found for
/// the `primary` component.
/// Returns an array of [`DataCell`]s (as well as the associated _data_ time and `RowId`) on
/// success, or `None` iff no cell could be found for the `primary` component.
pub fn latest_at<const N: usize>(
&self,
time: TimeInt,
query_time: TimeInt,
primary: ComponentName,
components: &[ComponentName; N],
) -> Option<(RowId, [Option<DataCell>; N])> {
) -> Option<(TimeInt, RowId, [Option<DataCell>; N])> {
self.sort_indices_if_needed();

let IndexedBucketInner {
Expand All @@ -748,11 +762,12 @@ impl IndexedBucket {
%primary,
?components,
timeline = %self.timeline.name(),
time = self.timeline.typ().format_utc(time),
query_time = self.timeline.typ().format_utc(query_time),
"searching for primary & secondary cells…"
);

let time_row_nr = col_time.partition_point(|t| *t <= time.as_i64()) as i64;
let time_row_nr =
col_time.partition_point(|data_time| *data_time <= query_time.as_i64()) as i64;

// The partition point is always _beyond_ the index that we're looking for.
// A partition point of 0 thus means that we're trying to query for data that lives
Expand All @@ -769,7 +784,7 @@ impl IndexedBucket {
%primary,
?components,
timeline = %self.timeline.name(),
time = self.timeline.typ().format_utc(time),
query_time = self.timeline.typ().format_utc(query_time),
%primary_row_nr,
"found primary row number",
);
Expand All @@ -783,7 +798,7 @@ impl IndexedBucket {
%primary,
?components,
timeline = %self.timeline.name(),
time = self.timeline.typ().format_utc(time),
query_time = self.timeline.typ().format_utc(query_time),
%primary_row_nr,
"no secondary row number found",
);
Expand All @@ -797,7 +812,7 @@ impl IndexedBucket {
%primary,
?components,
timeline = %self.timeline.name(),
time = self.timeline.typ().format_utc(time),
query_time = self.timeline.typ().format_utc(query_time),
%primary_row_nr, %secondary_row_nr,
"found secondary row number",
);
Expand All @@ -812,7 +827,7 @@ impl IndexedBucket {
%primary,
%component,
timeline = %self.timeline.name(),
time = self.timeline.typ().format_utc(time),
query_time = self.timeline.typ().format_utc(query_time),
%primary_row_nr, %secondary_row_nr,
"found cell",
);
Expand All @@ -821,7 +836,11 @@ impl IndexedBucket {
}
}

Some((col_row_id[secondary_row_nr as usize], cells))
Some((
col_time[secondary_row_nr as usize].into(),
col_row_id[secondary_row_nr as usize],
cells,
))
}

/// Iterates the bucket in order to return the cells of the specified `components`,
Expand Down
Loading
Loading