Skip to content

Commit

Permalink
archetype queries now always return the data time too
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Jan 5, 2024
1 parent fea5eba commit bcc7ab9
Show file tree
Hide file tree
Showing 21 changed files with 96 additions and 78 deletions.
10 changes: 8 additions & 2 deletions crates/re_data_store/src/store_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,10 @@ impl DataStore {
/// let components = &[cluster_key, primary];
/// let (_, cells) = store
/// .latest_at(&query, ent_path, primary, components)
/// .unwrap_or((RowId::ZERO, [(); 2].map(|_| None)));
/// .map_or_else(
/// || (RowId::ZERO, [(); 2].map(|_| None)),
/// |(_, row_id, cells)| (row_id, cells),
/// );
///
/// let series: Result<Vec<_>, _> = cells
/// .iter()
Expand Down Expand Up @@ -436,7 +439,10 @@ impl DataStore {
/// let query = LatestAtQuery::new(query.timeline, latest_time);
/// let (_, cells) = store
/// .latest_at(&query, ent_path, primary, &components)
/// .unwrap_or((RowId::ZERO, [(); 2].map(|_| None)));
/// .map_or_else(
/// || (RowId::ZERO, [(); 2].map(|_| None)),
/// |(_, row_id, cells)| (row_id, cells),
/// );
/// dataframe_from_cells(cells)
/// };
///
Expand Down
2 changes: 1 addition & 1 deletion crates/re_data_ui/src/component_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl DataUi for ComponentPath {
ui.label(format!(
"Indicator component for the {archetype_name} archetype"
));
} else if let Some((_, component_data)) =
} else if let Some((_, _, component_data)) =
re_query::get_component_with_instances(store, query, entity_path, *component_name)
{
super::component::EntityComponentWithInstances {
Expand Down
2 changes: 1 addition & 1 deletion crates/re_data_ui/src/instance_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl DataUi for InstancePath {
UiVerbosity::LimitHeight | UiVerbosity::Full => {}
}

let Some((_, component_data)) =
let Some((_, _, component_data)) =
get_component_with_instances(store, query, entity_path, component_name)
else {
continue; // no need to show components that are unset at this point in time
Expand Down
4 changes: 2 additions & 2 deletions crates/re_query/benches/query_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ fn query_and_visit_points(store: &DataStore, paths: &[EntityPath]) -> Vec<SavePo

// TODO(jleibs): Add Radius once we have support for it in field_types
for path in paths {
let arch_view = query_archetype::<Points2D>(store, &query, path).unwrap();
let (_, arch_view) = query_archetype::<Points2D>(store, &query, path).unwrap();
itertools::izip!(
arch_view.iter_required_component::<Position2D>().unwrap(),
arch_view.iter_optional_component::<Color>().unwrap()
Expand All @@ -299,7 +299,7 @@ fn query_and_visit_strings(store: &DataStore, paths: &[EntityPath]) -> Vec<SaveS
let mut strings = Vec::with_capacity(NUM_STRINGS as _);

for path in paths {
let arch_view = query_archetype::<Points2D>(store, &query, path).unwrap();
let (_, arch_view) = query_archetype::<Points2D>(store, &query, path).unwrap();
arch_view
.iter_optional_component::<Text>()
.unwrap()
Expand Down
1 change: 1 addition & 0 deletions crates/re_query/src/archetype_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ where
/// the required [`Component`]s using [`InstanceKey`] values.
#[derive(Clone, Debug)]
pub struct ArchetypeView<A: Archetype> {
// pub(crate) primary_data_time: Option<TimeInt>,
pub(crate) primary_row_id: RowId,
pub(crate) components: BTreeMap<ComponentName, ComponentWithInstances>,
pub(crate) phantom: PhantomData<A>,
Expand Down
31 changes: 19 additions & 12 deletions crates/re_query/src/query.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use re_data_store::{DataStore, LatestAtQuery};
use re_log_types::{EntityPath, RowId};
use re_log_types::{EntityPath, RowId, TimeInt};
use re_types_core::{components::InstanceKey, Archetype, ComponentName, Loggable};

use crate::{ArchetypeView, ComponentWithInstances, QueryError};
Expand Down Expand Up @@ -53,14 +53,16 @@ pub fn get_component_with_instances(
query: &LatestAtQuery,
ent_path: &EntityPath,
component: ComponentName,
) -> Option<(RowId, ComponentWithInstances)> {
) -> Option<(Option<TimeInt>, RowId, ComponentWithInstances)> {
debug_assert_eq!(store.cluster_key(), InstanceKey::name());

let components = [InstanceKey::name(), component];

let (_, row_id, mut cells) = store.latest_at(query, ent_path, component, &components)?;
let (data_time, row_id, mut cells) =
store.latest_at(query, ent_path, component, &components)?;

Some((
data_time,
row_id,
ComponentWithInstances {
// NOTE: The unwrap cannot fail, the cluster key's presence is guaranteed
Expand Down Expand Up @@ -119,7 +121,7 @@ pub fn query_archetype<A: Archetype>(
store: &DataStore,
query: &LatestAtQuery,
ent_path: &EntityPath,
) -> crate::Result<ArchetypeView<A>> {
) -> crate::Result<(Option<TimeInt>, ArchetypeView<A>)> {
re_tracing::profile_function!();

let required_components: Vec<_> = A::required_components()
Expand All @@ -138,9 +140,11 @@ pub fn query_archetype<A: Archetype>(
}
}

let (row_ids, required_components): (Vec<_>, Vec<_>) =
required_components.into_iter().flatten().unzip();
use itertools::Itertools as _;
let (data_times, row_ids, required_components): (Vec<_>, Vec<_>, Vec<_>) =
required_components.into_iter().flatten().multiunzip();

let data_time = data_times.first().unwrap_or(&None);
let row_id = row_ids.first().unwrap_or(&RowId::ZERO);

let recommended_components = A::recommended_components();
Expand All @@ -151,12 +155,15 @@ pub fn query_archetype<A: Archetype>(
.chain(optional_components.iter())
.filter_map(|component| {
get_component_with_instances(store, query, ent_path, *component)
.map(|(_, component_result)| component_result)
.map(|(_, _, component_result)| component_result)
});

Ok(ArchetypeView::from_components(
*row_id,
required_components.into_iter().chain(other_components),
Ok((
*data_time,
ArchetypeView::from_components(
*row_id,
required_components.into_iter().chain(other_components),
),
))
}

Expand Down Expand Up @@ -218,7 +225,7 @@ fn simple_get_component() {
let ent_path = "point";
let query = LatestAtQuery::new(Timeline::new_sequence("frame_nr"), 123.into());

let (_, component) =
let (_, _, component) =
get_component_with_instances(&store, &query, &ent_path.into(), MyPoint::name()).unwrap();

#[cfg(feature = "polars")]
Expand Down Expand Up @@ -253,7 +260,7 @@ fn simple_query_archetype() {
let ent_path = "point";
let query = LatestAtQuery::new(Timeline::new_sequence("frame_nr"), 123.into());

let arch_view = query_archetype::<MyPoints>(&store, &query, &ent_path.into()).unwrap();
let (_, arch_view) = query_archetype::<MyPoints>(&store, &query, &ent_path.into()).unwrap();

let expected_positions = [MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)];
let expected_colors = [None, Some(MyColor::from(0xff000000))];
Expand Down
61 changes: 30 additions & 31 deletions crates/re_query/src/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ pub fn range_archetype<'a, A: Archetype + 'a, const N: usize>(

// NOTE: This will return none for `TimeInt::Min`, i.e. range queries that start infinitely far
// into the past don't have a latest-at state!
let latest_time = query.range.min.as_i64().checked_sub(1).map(Into::into);
let query_time = query.range.min.as_i64().checked_sub(1).map(Into::into);

let mut cwis_latest = None;
if let Some(latest_time) = latest_time {
if let Some(query_time) = query_time {
let mut cwis_latest_raw: Vec<_> = std::iter::repeat_with(|| None)
.take(components.len())
.collect();
Expand All @@ -77,10 +77,11 @@ pub fn range_archetype<'a, A: Archetype + 'a, const N: usize>(
for (i, primary) in components.iter().enumerate() {
cwis_latest_raw[i] = get_component_with_instances(
store,
&LatestAtQuery::new(query.timeline, latest_time),
&LatestAtQuery::new(query.timeline, query_time),
ent_path,
*primary,
);
)
.map(|(_, row_id, cwi)| (row_id, cwi));
}

if cwis_latest_raw[primary_col].is_some() {
Expand All @@ -91,33 +92,31 @@ pub fn range_archetype<'a, A: Archetype + 'a, const N: usize>(
// send the latest-at state before anything else
cwis_latest
.into_iter()
.map(move |cwis| (latest_time, true, cwis))
.chain(
store
.range(query, ent_path, components)
.map(move |(time, row_id, mut cells)| {
// NOTE: The unwrap cannot fail, the cluster key's presence is guaranteed
// by the store.
let instance_keys = cells[cluster_col].take().unwrap();
let is_primary = cells[primary_col].is_some();
let cwis = cells
.into_iter()
.map(|cell| {
cell.map(|cell| {
(
row_id,
ComponentWithInstances {
instance_keys: instance_keys.clone(), /* shallow */
values: cell,
},
)
})
.map(move |cwis| (query_time, true, cwis))
.chain(store.range(query, ent_path, components).map(
move |(data_time, row_id, mut cells)| {
// NOTE: The unwrap cannot fail, the cluster key's presence is guaranteed
// by the store.
let instance_keys = cells[cluster_col].take().unwrap();
let is_primary = cells[primary_col].is_some();
let cwis = cells
.into_iter()
.map(|cell| {
cell.map(|cell| {
(
row_id,
ComponentWithInstances {
instance_keys: instance_keys.clone(), /* shallow */
values: cell,
},
)
})
.collect::<Vec<_>>();
(time, is_primary, cwis)
}),
)
.filter_map(move |(time, is_primary, cwis)| {
})
.collect::<Vec<_>>();
(data_time, is_primary, cwis)
},
))
.filter_map(move |(data_time, is_primary, cwis)| {
for (i, cwi) in cwis
.into_iter()
.enumerate()
Expand All @@ -138,7 +137,7 @@ pub fn range_archetype<'a, A: Archetype + 'a, const N: usize>(

let arch_view = ArchetypeView::from_components(row_id, components);

(time, arch_view)
(data_time, arch_view)
})
})
}
4 changes: 2 additions & 2 deletions crates/re_query/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub fn query_archetype_with_history<'a, A: Archetype + 'a, const N: usize>(
time: &'a TimeInt,
history: &ExtraQueryHistory,
ent_path: &'a EntityPath,
) -> crate::Result<impl Iterator<Item = ArchetypeView<A>> + 'a> {
) -> crate::Result<impl Iterator<Item = (Option<TimeInt>, ArchetypeView<A>)> + 'a> {
let visible_history = match timeline.typ() {
re_log_types::TimeType::Time => history.nanos,
re_log_types::TimeType::Sequence => history.sequences,
Expand All @@ -30,6 +30,6 @@ pub fn query_archetype_with_history<'a, A: Archetype + 'a, const N: usize>(

let range = range_archetype::<A, N>(store, &range_query, ent_path);

Ok(itertools::Either::Right(range.map(|(_, entity)| entity)))
Ok(itertools::Either::Right(range))
}
}
15 changes: 10 additions & 5 deletions crates/re_query/tests/archetype_query_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ fn simple_query() {
// Retrieve the view
let timeline_query = re_data_store::LatestAtQuery::new(timepoint[0].0, timepoint[0].1);

let arch_view = query_archetype::<Points2D>(&store, &timeline_query, &ent_path.into()).unwrap();
let (_, arch_view) =
query_archetype::<Points2D>(&store, &timeline_query, &ent_path.into()).unwrap();

// We expect this to generate the following `DataFrame`
// ┌──────────┬───────────┬────────────┐
Expand Down Expand Up @@ -105,7 +106,8 @@ fn timeless_query() {
// Retrieve the view
let timeline_query = re_data_store::LatestAtQuery::new(timepoint[0].0, timepoint[0].1);

let arch_view = query_archetype::<Points2D>(&store, &timeline_query, &ent_path.into()).unwrap();
let (_, arch_view) =
query_archetype::<Points2D>(&store, &timeline_query, &ent_path.into()).unwrap();

// We expect this to generate the following `DataFrame`
// ┌──────────┬───────────┬────────────┐
Expand Down Expand Up @@ -167,7 +169,8 @@ fn no_instance_join_query() {
// Retrieve the view
let timeline_query = re_data_store::LatestAtQuery::new(timepoint[0].0, timepoint[0].1);

let arch_view = query_archetype::<Points2D>(&store, &timeline_query, &ent_path.into()).unwrap();
let (_, arch_view) =
query_archetype::<Points2D>(&store, &timeline_query, &ent_path.into()).unwrap();

// We expect this to generate the following `DataFrame`
// ┌──────────┬───────────┬────────────┐
Expand Down Expand Up @@ -227,7 +230,8 @@ fn missing_column_join_query() {
// Retrieve the view
let timeline_query = re_data_store::LatestAtQuery::new(timepoint[0].0, timepoint[0].1);

let arch_view = query_archetype::<Points2D>(&store, &timeline_query, &ent_path.into()).unwrap();
let (_, arch_view) =
query_archetype::<Points2D>(&store, &timeline_query, &ent_path.into()).unwrap();

// We expect this to generate the following `DataFrame`
//
Expand Down Expand Up @@ -296,7 +300,8 @@ fn splatted_query() {
// Retrieve the view
let timeline_query = re_data_store::LatestAtQuery::new(timepoint[0].0, timepoint[0].1);

let arch_view = query_archetype::<Points2D>(&store, &timeline_query, &ent_path.into()).unwrap();
let (_, arch_view) =
query_archetype::<Points2D>(&store, &timeline_query, &ent_path.into()).unwrap();

// We expect this to generate the following `DataFrame`
// ┌──────────┬───────────┬────────────┐
Expand Down
11 changes: 6 additions & 5 deletions crates/re_query_cache/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ macro_rules! impl_query_archetype {

for (time, arch_view) in arch_views {
let data = (
// TODO(cmc): `ArchetypeView` should indicate its pov time.
// TODO(cmc): actual timeless caching support.
(time.unwrap_or(TimeInt::MIN), arch_view.primary_row_id()),
MaybeCachedComponentData::Raw(arch_view.iter_instance_keys().collect()),
$(MaybeCachedComponentData::Raw(arch_view.iter_required_component::<$pov>()?.collect()),)+
Expand All @@ -126,11 +126,11 @@ macro_rules! impl_query_archetype {
AnyQuery::LatestAt(query) if !cached => {
re_tracing::profile_scope!("latest_at", format!("{query:?}"));

let arch_view = ::re_query::query_archetype::<A>(store, query, entity_path)?;
let (data_time, arch_view) = ::re_query::query_archetype::<A>(store, query, entity_path)?;

let data = (
// TODO(cmc): `ArchetypeView` should indicate its pov time.
(TimeInt::MIN, arch_view.primary_row_id()),
// TODO(cmc): actual timeless caching support.
(data_time.unwrap_or(TimeInt::MIN), arch_view.primary_row_id()),
MaybeCachedComponentData::Raw(arch_view.iter_instance_keys().collect()),
$(MaybeCachedComponentData::Raw(arch_view.iter_required_component::<$pov>()?.collect()),)+
$(MaybeCachedComponentData::Raw(arch_view.iter_optional_component::<$comp>()?.collect()),)*
Expand All @@ -155,7 +155,8 @@ macro_rules! impl_query_archetype {

if bucket.is_empty() {
let now = web_time::Instant::now();
let arch_view = query_archetype::<A>(store, &query, entity_path)?;
// TODO(cmc): cache deduplication.
let (_data_time, arch_view) = query_archetype::<A>(store, &query, entity_path)?;

bucket.[<insert_pov $N _comp$M>]::<A, $($pov,)+ $($comp,)*>(query.at, &arch_view)?;

Expand Down
2 changes: 1 addition & 1 deletion crates/re_query_cache/tests/latest_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ fn query_and_compare(store: &DataStore, query: &LatestAtQuery, ent_path: &Entity
)
.unwrap();

let expected = re_query::query_archetype::<Points2D>(store, query, ent_path).unwrap();
let (_, expected) = re_query::query_archetype::<Points2D>(store, query, ent_path).unwrap();

let expected_instance_keys = expected.iter_instance_keys().collect_vec();
let expected_positions = expected
Expand Down
4 changes: 2 additions & 2 deletions crates/re_space_view_dataframe/src/space_view_class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl SpaceViewClass for DataframeSpaceView {
// TODO(#4466): make it explicit if that value results
// from a splat joint.

if let Some((_, comp_inst)) =
if let Some((_, _, comp_inst)) =
// This is a duplicate of the one above, but this ok since this codes runs
// *only* for visible rows.
get_component_with_instances(
Expand Down Expand Up @@ -243,7 +243,7 @@ fn sorted_instance_paths_for<'a>(
.filter(|comp| !comp.is_indicator_component())
.flat_map(|comp| {
get_component_with_instances(store, latest_at_query, entity_path, comp)
.map(|(_, comp_inst)| comp_inst.instance_keys())
.map(|(_, _, comp_inst)| comp_inst.instance_keys())
.unwrap_or_default()
})
.filter(|instance_key| !instance_key.is_splat())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,18 @@ where
&data_result.accumulated_properties().visible_history,
&data_result.entity_path,
)
.and_then(|entity_views| {
for ent_view in entity_views {
.and_then(|arch_views| {
for (_, arch_view) in arch_views {
counter.num_primitives.fetch_add(
ent_view.num_instances(),
arch_view.num_instances(),
std::sync::atomic::Ordering::Relaxed,
);

fun(
ctx,
&data_result.entity_path,
data_result.accumulated_properties(),
ent_view,
arch_view,
&entity_context,
)?;
}
Expand Down
Loading

0 comments on commit bcc7ab9

Please sign in to comment.