Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Primary caching 16: context-free range semantics #4851

Merged
merged 5 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 42 additions & 9 deletions crates/re_data_store/src/polars_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,21 +123,54 @@ pub fn range_components<'a, const N: usize>(

let mut state = None;

// NOTE: This will return none for `TimeInt::Min`, i.e. range queries that start infinitely far
// into the past don't have a latest-at state!
let latest_time = query.range.min.as_i64().checked_sub(1).map(Into::into);

let mut df_latest = None;
if let Some(latest_time) = latest_time {
let df = latest_components(
store,
&LatestAtQuery::new(query.timeline, latest_time),
ent_path,
&components,
join_type,
);

df_latest = Some(df);
}

let primary_col = components
.iter()
.find_position(|component| **component == primary)
.map(|(col, _)| col)
.unwrap(); // asserted on entry

store
.range(query, ent_path, components)
.map(move |(time, _, cells)| {
(
time,
cells[primary_col].is_some(), // is_primary
dataframe_from_cells(&cells),
)
})
// send the latest-at state before anything else
df_latest
.into_iter()
// NOTE: `false` here means we will _not_ yield the latest-at state as an actual
// ArchetypeView!
// That is a very important detail: for overlapping range queries to be correct in a
// multi-tenant cache context, we need to make sure to inherit the latest-at state
// from T-1, while also making sure to _not_ yield the view that comes with that state.
//
// Consider e.g. what happens when one system queries for `range(10, 20)` while another
// queries for `range(9, 20)`: the data at timestamp `10` would differ because of the
// statefulness of range queries!
.map(move |df| (latest_time, false, df))
// followed by the range
.chain(
store
.range(query, ent_path, components)
.map(move |(time, _, cells)| {
(
time,
cells[primary_col].is_some(), // is_primary
dataframe_from_cells(&cells),
)
}),
)
.filter_map(move |(time, is_primary, df)| {
state = Some(join_dataframes(
cluster_key,
Expand Down
39 changes: 22 additions & 17 deletions crates/re_data_store/tests/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,18 +553,18 @@ fn range_impl(store: &mut DataStore) {

// Unit ranges (Color's PoV)

// NOTE: Check out [1] to see what the results would've looked like with latest-at semantics at
// T-1 baked in (like we used to do).
//
// [1]: <https://github.com/rerun-io/rerun/blob/790f391/crates/re_data_store/tests/data_store.rs#L555-L837>

assert_range_components(
TimeRange::new(frame1, frame1),
[Color::name(), Position2D::name()],
&[(
Some(frame1),
&[(Color::name(), &row1)], //
)],
&[
(
Some(frame1),
&[
(Color::name(), &row1),
(Position2D::name(), &row4_4), // timeless
],
), //
],
);
assert_range_components(
TimeRange::new(frame2, frame2),
Expand All @@ -582,11 +582,11 @@ fn range_impl(store: &mut DataStore) {
&[
(
Some(frame4),
&[(Color::name(), &row4_1)], //
&[(Color::name(), &row4_1), (Position2D::name(), &row3)],
),
(
Some(frame4),
&[(Color::name(), &row4_2)], //
&[(Color::name(), &row4_2), (Position2D::name(), &row3)],
),
(
Some(frame4),
Expand All @@ -613,17 +613,19 @@ fn range_impl(store: &mut DataStore) {
&[
(
Some(frame2),
&[(Position2D::name(), &row2)], //
&[(Position2D::name(), &row2), (Color::name(), &row1)],
), //
],
);
assert_range_components(
TimeRange::new(frame3, frame3),
[Position2D::name(), Color::name()],
&[(
Some(frame3),
&[(Position2D::name(), &row3)], //
)],
&[
(
Some(frame3),
&[(Position2D::name(), &row3), (Color::name(), &row1)],
), //
],
);
assert_range_components(
TimeRange::new(frame4, frame4),
Expand Down Expand Up @@ -653,7 +655,10 @@ fn range_impl(store: &mut DataStore) {
&[
(
Some(frame1),
&[(Color::name(), &row1)], //
&[
(Color::name(), &row1),
(Position2D::name(), &row4_4), // timeless
],
),
(
Some(frame4),
Expand Down
87 changes: 63 additions & 24 deletions crates/re_query/src/range.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use itertools::Itertools as _;
use re_data_store::{DataStore, RangeQuery};
use re_data_store::{DataStore, LatestAtQuery, RangeQuery};
use re_log_types::EntityPath;
use re_types_core::{Archetype, ComponentName};

use crate::{ArchetypeView, ComponentWithInstances};
use crate::{get_component_with_instances, ArchetypeView, ComponentWithInstances};

// ---

Expand Down Expand Up @@ -61,29 +61,68 @@ pub fn range_archetype<'a, A: Archetype + 'a, const N: usize>(
.take(components.len())
.collect();

store
.range(query, ent_path, components)
.map(move |(data_time, row_id, mut cells)| {
// NOTE: The unwrap cannot fail, the cluster key's presence is guaranteed
// by the store.
let instance_keys = cells[cluster_col].take().unwrap();
let is_primary = cells[primary_col].is_some();
let cwis = cells
.into_iter()
.map(|cell| {
cell.map(|cell| {
(
row_id,
ComponentWithInstances {
instance_keys: instance_keys.clone(), /* shallow */
values: cell,
},
)
// NOTE: This will return none for `TimeInt::Min`, i.e. range queries that start infinitely far
// into the past don't have a latest-at state!
let query_time = query.range.min.as_i64().checked_sub(1).map(Into::into);

let mut cwis_latest = None;
if let Some(query_time) = query_time {
let mut cwis_latest_raw: Vec<_> = std::iter::repeat_with(|| None)
.take(components.len())
.collect();

// Fetch the latest data for every single component from their respective point-of-views,
// this will allow us to build up the initial state and send an initial latest-at
// entity-view if needed.
for (i, primary) in components.iter().enumerate() {
cwis_latest_raw[i] = get_component_with_instances(
store,
&LatestAtQuery::new(query.timeline, query_time),
ent_path,
*primary,
)
.map(|(_, row_id, cwi)| (row_id, cwi));
}

cwis_latest = Some(cwis_latest_raw);
}

// send the latest-at state before anything else
cwis_latest
.into_iter()
// NOTE: `false` here means we will _not_ yield the latest-at state as an actual
// ArchetypeView!
// That is a very important detail: for overlapping range queries to be correct in a
// multi-tenant cache context, we need to make sure to inherit the latest-at state
// from T-1, while also making sure to _not_ yield the view that comes with that state.
//
// Consider e.g. what happens when one system queries for `range(10, 20)` while another
// queries for `range(9, 20)`: the data at timestamp `10` would differ because of the
// statefulness of range queries!
.map(move |cwis| (query_time, false, cwis))
.chain(store.range(query, ent_path, components).map(
move |(data_time, row_id, mut cells)| {
// NOTE: The unwrap cannot fail, the cluster key's presence is guaranteed
// by the store.
let instance_keys = cells[cluster_col].take().unwrap();
let is_primary = cells[primary_col].is_some();
let cwis = cells
.into_iter()
.map(|cell| {
cell.map(|cell| {
(
row_id,
ComponentWithInstances {
instance_keys: instance_keys.clone(), /* shallow */
values: cell,
},
)
})
})
})
.collect::<Vec<_>>();
(data_time, is_primary, cwis)
})
.collect::<Vec<_>>();
(data_time, is_primary, cwis)
},
))
.filter_map(move |(data_time, is_primary, cwis)| {
for (i, cwi) in cwis
.into_iter()
Expand Down
6 changes: 1 addition & 5 deletions crates/re_query/tests/archetype_range_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,6 @@ fn timeless_range() {

// --- First test: `(timepoint1, timepoint3]` ---

// The exclusion of `timepoint1` means latest-at semantics will kick in!

let query = re_data_store::RangeQuery::new(
timepoint1[0].0,
TimeRange::new((timepoint1[0].1.as_i64() + 1).into(), timepoint3[0].1),
Expand Down Expand Up @@ -416,7 +414,7 @@ fn timeless_range() {
Some(Position2D::new(1.0, 2.0)),
Some(Position2D::new(3.0, 4.0)),
];
let colors: Vec<Option<Color>> = vec![None, None];
let colors = vec![None, Some(Color::from_rgb(255, 0, 0))];
let expected = DataCellRow(smallvec![
DataCell::from_native_sparse(instances),
DataCell::from_native_sparse(positions),
Expand Down Expand Up @@ -731,8 +729,6 @@ fn simple_splatted_range() {

// --- Second test: `[timepoint1, timepoint3]` ---

// The inclusion of `timepoint1` means latest-at semantics will _not_ kick in!

let query = re_data_store::RangeQuery::new(
timepoint1[0].0,
TimeRange::new(timepoint1[0].1, timepoint3[0].1),
Expand Down
2 changes: 1 addition & 1 deletion tests/rust/plot_dashboard_stress/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ publish = false

[dependencies]
re_log = { workspace = true, features = ["setup"] }
rerun = { path = "../../../crates/rerun" }
rerun = { path = "../../../crates/rerun", features = ["clap"] }

anyhow = "1.0"
clap = { version = "4.0", features = ["derive"] }
Expand Down
Loading