Skip to content

Commit

Permalink
Revert "Primary caching 14: don't bake LatestAt(T-1) results into l…
Browse files Browse the repository at this point in the history
…ow-level range queries (#4793)"

This reverts commit dc8cf2d.
  • Loading branch information
teh-cmc committed Jan 16, 2024
1 parent a36243b commit 6295c43
Show file tree
Hide file tree
Showing 8 changed files with 393 additions and 83 deletions.
2 changes: 1 addition & 1 deletion crates/re_data_store/examples/range_components.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Demonstrates usage of [`re_data_store::polars_util::range_components`].
//!
//! ```text
//! POLARS_FMT_MAX_ROWS=100 cargo r -p re_data_store --all-features --example range_components
//! POLARS_FMT_MAX_ROWS=100 cargo r -p re_data_store --example range_components
//! ```
use polars_core::prelude::JoinType;
Expand Down
51 changes: 42 additions & 9 deletions crates/re_data_store/src/polars_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ pub fn latest_components(
/// Iterates over the rows of any number of components and their respective cluster keys, all from
/// the single point-of-view of the `primary` component, returning an iterator of `DataFrame`s.
///
/// An initial dataframe is yielded with the latest-at state at the start of the time range, if
/// there is any.
///
/// The iterator only ever yields dataframes iff the `primary` component has changed.
/// A change affecting only secondary components will not yield a dataframe.
///
Expand Down Expand Up @@ -123,21 +126,51 @@ pub fn range_components<'a, const N: usize>(

let mut state = None;

// NOTE: This will return none for `TimeInt::Min`, i.e. range queries that start infinitely far
// into the past don't have a latest-at state!
let latest_time = query.range.min.as_i64().checked_sub(1).map(Into::into);

let mut df_latest = None;
if let Some(latest_time) = latest_time {
let df = latest_components(
store,
&LatestAtQuery::new(query.timeline, latest_time),
ent_path,
&components,
join_type,
);

if df.as_ref().map_or(false, |df| {
// We only care about the initial state if it A) isn't empty and B) contains any data
// at all for the primary component.
!df.is_empty() && df.column(primary.as_ref()).is_ok()
}) {
df_latest = Some(df);
}
}

let primary_col = components
.iter()
.find_position(|component| **component == primary)
.map(|(col, _)| col)
.unwrap(); // asserted on entry

store
.range(query, ent_path, components)
.map(move |(time, _, cells)| {
(
time,
cells[primary_col].is_some(), // is_primary
dataframe_from_cells(&cells),
)
})
// send the latest-at state before anything else
df_latest
.into_iter()
.map(move |df| (latest_time, true, df))
// followed by the range
.chain(
store
.range(query, ent_path, components)
.map(move |(time, _, cells)| {
(
time,
cells[primary_col].is_some(), // is_primary
dataframe_from_cells(&cells),
)
}),
)
.filter_map(move |(time, is_primary, df)| {
state = Some(join_dataframes(
cluster_key,
Expand Down
109 changes: 87 additions & 22 deletions crates/re_data_store/tests/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ fn range_impl(store: &mut DataStore) {

let ent_path = EntityPath::from("this/that");

let frame0 = TimeInt::from(0);
let frame1 = TimeInt::from(1);
let frame2 = TimeInt::from(2);
let frame3 = TimeInt::from(3);
Expand Down Expand Up @@ -553,40 +554,61 @@ fn range_impl(store: &mut DataStore) {

// Unit ranges (Color's PoV)

// NOTE: Check out [1] to see what the results would've looked like with latest-at semantics at
// T-1 baked in (like we used to do).
//
// [1]: <https://github.com/rerun-io/rerun/blob/790f391/crates/re_data_store/tests/data_store.rs#L555-L837>

assert_range_components(
TimeRange::new(frame1, frame1),
[Color::name(), Position2D::name()],
&[(
Some(frame1),
&[(Color::name(), &row1)], //
)],
&[
(
Some(frame0),
&[(Color::name(), &row4_3), (Position2D::name(), &row4_4)],
), // timeless
(
Some(frame1),
&[
(Color::name(), &row1),
(Position2D::name(), &row4_4), // timeless
],
),
],
);
assert_range_components(
TimeRange::new(frame2, frame2),
[Color::name(), Position2D::name()],
&[],
&[
(
Some(frame1),
&[
(Color::name(), &row1),
(Position2D::name(), &row4_4), // timeless
],
), //
],
);
assert_range_components(
TimeRange::new(frame3, frame3),
[Color::name(), Position2D::name()],
&[],
&[
(
Some(frame2),
&[(Color::name(), &row1), (Position2D::name(), &row2)],
), //
],
);
assert_range_components(
TimeRange::new(frame4, frame4),
[Color::name(), Position2D::name()],
&[
(
Some(frame3),
&[(Color::name(), &row1), (Position2D::name(), &row3)],
),
(
Some(frame4),
&[(Color::name(), &row4_1)], //
&[(Color::name(), &row4_1), (Position2D::name(), &row3)],
),
(
Some(frame4),
&[(Color::name(), &row4_2)], //
&[(Color::name(), &row4_2), (Position2D::name(), &row3)],
),
(
Some(frame4),
Expand All @@ -597,38 +619,65 @@ fn range_impl(store: &mut DataStore) {
assert_range_components(
TimeRange::new(frame5, frame5),
[Color::name(), Position2D::name()],
&[],
&[
(
Some(frame4),
&[(Color::name(), &row4_3), (Position2D::name(), &row4_4)], // !!!
), //
],
);

// Unit ranges (Position2D's PoV)

assert_range_components(
TimeRange::new(frame1, frame1),
[Position2D::name(), Color::name()],
&[],
&[
(
Some(frame0),
&[(Position2D::name(), &row4_4), (Color::name(), &row4_3)],
), // timeless
],
);
assert_range_components(
TimeRange::new(frame2, frame2),
[Position2D::name(), Color::name()],
&[
(
Some(frame1),
&[
(Position2D::name(), &row4_4), // timeless
(Color::name(), &row1),
],
),
(
Some(frame2),
&[(Position2D::name(), &row2)], //
&[(Position2D::name(), &row2), (Color::name(), &row1)],
), //
],
);
assert_range_components(
TimeRange::new(frame3, frame3),
[Position2D::name(), Color::name()],
&[(
Some(frame3),
&[(Position2D::name(), &row3)], //
)],
&[
(
Some(frame2),
&[(Position2D::name(), &row2), (Color::name(), &row1)],
),
(
Some(frame3),
&[(Position2D::name(), &row3), (Color::name(), &row1)],
),
],
);
assert_range_components(
TimeRange::new(frame4, frame4),
[Position2D::name(), Color::name()],
&[
(
Some(frame3),
&[(Position2D::name(), &row3), (Color::name(), &row1)],
),
(
Some(frame4),
&[(Position2D::name(), &row4_25), (Color::name(), &row4_2)],
Expand All @@ -642,7 +691,12 @@ fn range_impl(store: &mut DataStore) {
assert_range_components(
TimeRange::new(frame5, frame5),
[Position2D::name(), Color::name()],
&[],
&[
(
Some(frame4),
&[(Position2D::name(), &row4_4), (Color::name(), &row4_3)],
), //
],
);

// Full range (Color's PoV)
Expand All @@ -651,9 +705,16 @@ fn range_impl(store: &mut DataStore) {
TimeRange::new(frame1, frame5),
[Color::name(), Position2D::name()],
&[
(
Some(frame0),
&[(Color::name(), &row4_3), (Position2D::name(), &row4_4)],
), // timeless
(
Some(frame1),
&[(Color::name(), &row1)], //
&[
(Color::name(), &row1),
(Position2D::name(), &row4_4), // timeless
],
),
(
Some(frame4),
Expand All @@ -676,6 +737,10 @@ fn range_impl(store: &mut DataStore) {
TimeRange::new(frame1, frame5),
[Position2D::name(), Color::name()],
&[
(
Some(frame0),
&[(Position2D::name(), &row4_4), (Color::name(), &row4_3)],
), // timeless
(
Some(frame2),
&[(Position2D::name(), &row2), (Color::name(), &row1)],
Expand Down
80 changes: 56 additions & 24 deletions crates/re_query/src/range.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use itertools::Itertools as _;
use re_data_store::{DataStore, RangeQuery};
use re_data_store::{DataStore, LatestAtQuery, RangeQuery};
use re_log_types::EntityPath;
use re_types_core::{Archetype, ComponentName};

use crate::{ArchetypeView, ComponentWithInstances};
use crate::{get_component_with_instances, ArchetypeView, ComponentWithInstances};

// ---

Expand Down Expand Up @@ -61,29 +61,61 @@ pub fn range_archetype<'a, A: Archetype + 'a, const N: usize>(
.take(components.len())
.collect();

store
.range(query, ent_path, components)
.map(move |(data_time, row_id, mut cells)| {
// NOTE: The unwrap cannot fail, the cluster key's presence is guaranteed
// by the store.
let instance_keys = cells[cluster_col].take().unwrap();
let is_primary = cells[primary_col].is_some();
let cwis = cells
.into_iter()
.map(|cell| {
cell.map(|cell| {
(
row_id,
ComponentWithInstances {
instance_keys: instance_keys.clone(), /* shallow */
values: cell,
},
)
// NOTE: This will return none for `TimeInt::Min`, i.e. range queries that start infinitely far
// into the past don't have a latest-at state!
let query_time = query.range.min.as_i64().checked_sub(1).map(Into::into);

let mut cwis_latest = None;
if let Some(query_time) = query_time {
let mut cwis_latest_raw: Vec<_> = std::iter::repeat_with(|| None)
.take(components.len())
.collect();

// Fetch the latest data for every single component from their respective point-of-views,
// this will allow us to build up the initial state and send an initial latest-at
// entity-view if needed.
for (i, primary) in components.iter().enumerate() {
cwis_latest_raw[i] = get_component_with_instances(
store,
&LatestAtQuery::new(query.timeline, query_time),
ent_path,
*primary,
)
.map(|(_, row_id, cwi)| (row_id, cwi));
}

if cwis_latest_raw[primary_col].is_some() {
cwis_latest = Some(cwis_latest_raw);
}
}

// send the latest-at state before anything else
cwis_latest
.into_iter()
.map(move |cwis| (query_time, true, cwis))
.chain(store.range(query, ent_path, components).map(
move |(data_time, row_id, mut cells)| {
// NOTE: The unwrap cannot fail, the cluster key's presence is guaranteed
// by the store.
let instance_keys = cells[cluster_col].take().unwrap();
let is_primary = cells[primary_col].is_some();
let cwis = cells
.into_iter()
.map(|cell| {
cell.map(|cell| {
(
row_id,
ComponentWithInstances {
instance_keys: instance_keys.clone(), /* shallow */
values: cell,
},
)
})
})
})
.collect::<Vec<_>>();
(data_time, is_primary, cwis)
})
.collect::<Vec<_>>();
(data_time, is_primary, cwis)
},
))
.filter_map(move |(data_time, is_primary, cwis)| {
for (i, cwi) in cwis
.into_iter()
Expand Down
Loading

0 comments on commit 6295c43

Please sign in to comment.