Skip to content

Commit

Permalink
New data APIs 4: cached latest-at mono helpers everywhere (#5606)
Browse files Browse the repository at this point in the history
Now that we have a component-based latest-at cache, we can start
replacing legacy uncached helpers with new ones.
Commit-by-commit review should be trivial.

Because the new APIs are designed with promises in mind, this already
highlights a whole bunch of places where we need to think about what to
do in case the data is not ready yet.
As indicated in #5607, these places have been labeled `TODO(#5607)` in
the code.
For now, we simply treat a pending promise the same as missing data.

This PR also adds the new `Caches` and `PromiseResolver` to the
`EntityDb`.
To run a cached query, you now need a `DataStore`, a `Caches` and a
`PromiseResolver`, i.e. you need an `EntityDb`.

---

Part of a PR series to completely revamp the data APIs in preparation
for the removal of instance keys and the introduction of promises:
- #5573
- #5574
- #5581
- #5605
- #5606
- #5633
- #5673
- #5679
- #5687
- #5755
- TODO
- TODO

Builds on top of the static data PR series:
- #5534
  • Loading branch information
teh-cmc authored Apr 8, 2024
1 parent efca999 commit 0b300fb
Show file tree
Hide file tree
Showing 38 changed files with 620 additions and 527 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/re_data_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ pub use self::arrow_util::ArrayExt;
pub use self::store::{DataStore, DataStoreConfig, StoreGeneration};
pub use self::store_event::{StoreDiff, StoreDiffKind, StoreEvent};
pub use self::store_gc::{GarbageCollectionOptions, GarbageCollectionTarget};
pub use self::store_helpers::VersionedComponent;
pub use self::store_read::{LatestAtQuery, RangeQuery};
pub use self::store_stats::{DataStoreRowStats, DataStoreStats, EntityStats};
pub use self::store_subscriber::{StoreSubscriber, StoreSubscriberHandle};
Expand Down
206 changes: 2 additions & 204 deletions crates/re_data_store/src/store_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,209 +1,7 @@
use re_log_types::{DataCell, DataRow, EntityPath, RowId, TimeInt, TimePoint, Timeline};

use re_log_types::{DataCell, DataRow, EntityPath, RowId, TimePoint};
use re_types_core::{Component, ComponentName};

use crate::{DataStore, LatestAtQuery};

// --- Read ---

// TODO(cmc): these helpers have got to go once the new APIs land.

/// A [`Component`] at a specific _data_ time, versioned with a specific [`RowId`].
///
/// This is not enough to globally, uniquely identify an instance of a component.
/// For that you will need to combine the `InstancePath` that was used to query
/// the versioned component with the returned [`RowId`], therefore creating a
/// `VersionedInstancePath`.
#[derive(Debug, Clone)]
pub struct VersionedComponent<C: Component> {
pub data_time: TimeInt,
pub row_id: RowId,
pub value: C,
}

impl<C: Component> VersionedComponent<C> {
#[inline]
pub fn new(data_time: TimeInt, row_id: RowId, value: C) -> Self {
Self {
data_time,
row_id,
value,
}
}
}

impl<C: Component> std::ops::Deref for VersionedComponent<C> {
type Target = C;

#[inline]
fn deref(&self) -> &Self::Target {
&self.value
}
}

impl DataStore {
/// Get the latest value for a given [`re_types_core::Component`], as well as the associated
/// _data_ time and [`RowId`].
///
/// This assumes that the row we get from the store only contains a single instance for this
/// component; it will generate a log message of `level` otherwise.
///
/// This should only be used for "mono-components" such as `Transform` and `Tensor`.
///
/// This is a best-effort helper, it will merely log messages on failure.
pub fn query_latest_component_with_log_level<C: Component>(
&self,
entity_path: &EntityPath,
query: &LatestAtQuery,
level: re_log::Level,
) -> Option<VersionedComponent<C>> {
re_tracing::profile_function!();

let (data_time, row_id, cells) =
self.latest_at(query, entity_path, C::name(), &[C::name()])?;
let cell = cells.first()?.as_ref()?;

cell.try_to_native_mono::<C>()
.map_err(|err| {
if let re_log_types::DataCellError::LoggableDeserialize(err) = err {
let bt = err.backtrace().map(|mut bt| {
bt.resolve();
bt
});

let err = Box::new(err) as Box<dyn std::error::Error>;
if let Some(bt) = bt {
re_log::log_once!(
level,
"Couldn't deserialize component at {entity_path}#{}: {}\n{:#?}",
C::name(),
re_error::format(&err),
bt,
);
} else {
re_log::log_once!(
level,
"Couldn't deserialize component at {entity_path}#{}: {}",
C::name(),
re_error::format(&err)
);
}
return err;
}

let err = Box::new(err) as Box<dyn std::error::Error>;
re_log::log_once!(
level,
"Couldn't deserialize component at {entity_path}#{}: {}",
C::name(),
re_error::format(&err)
);

err
})
.ok()?
.map(|c| VersionedComponent::new(data_time, row_id, c))
}

/// Get the latest value for a given [`re_types_core::Component`], as well as the associated
/// _data_ time and [`RowId`].
///
/// This assumes that the row we get from the store only contains a single instance for this
/// component; it will log a warning otherwise.
///
/// This should only be used for "mono-components" such as `Transform` and `Tensor`.
///
/// This is a best-effort helper, it will merely log errors on failure.
#[inline]
pub fn query_latest_component<C: Component>(
&self,
entity_path: &EntityPath,
query: &LatestAtQuery,
) -> Option<VersionedComponent<C>> {
self.query_latest_component_with_log_level(entity_path, query, re_log::Level::Warn)
}

/// Get the latest value for a given [`re_types_core::Component`], as well as the associated
/// _data_ time and [`RowId`].
///
/// This assumes that the row we get from the store only contains a single instance for this
/// component; it will return None and log a debug message otherwise.
///
/// This should only be used for "mono-components" such as `Transform` and `Tensor`.
///
/// This is a best-effort helper, it will merely logs debug messages on failure.
#[inline]
pub fn query_latest_component_quiet<C: Component>(
&self,
entity_path: &EntityPath,
query: &LatestAtQuery,
) -> Option<VersionedComponent<C>> {
self.query_latest_component_with_log_level(entity_path, query, re_log::Level::Debug)
}

/// Call [`Self::query_latest_component`] at the given path, walking up the hierarchy until an instance is found.
pub fn query_latest_component_at_closest_ancestor<C: Component>(
&self,
entity_path: &EntityPath,
query: &LatestAtQuery,
) -> Option<(EntityPath, VersionedComponent<C>)> {
re_tracing::profile_function!();

let mut cur_path = Some(entity_path.clone());
while let Some(path) = cur_path {
if let Some(vc) = self.query_latest_component::<C>(&path, query) {
return Some((path, vc));
}
cur_path = path.parent();
}
None
}

/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`],
/// assuming it is static.
///
/// This assumes that the row we get from the store only contains a single instance for this
/// component; it will log a warning otherwise.
///
/// This should only be used for "mono-components" such as `Transform` and `Tensor`.
///
/// This is a best-effort helper, it will merely log errors on failure.
pub fn query_static_component<C: Component>(
&self,
entity_path: &EntityPath,
) -> Option<VersionedComponent<C>> {
re_tracing::profile_function!();

let query = LatestAtQuery::latest(Timeline::default());
self.query_latest_component(entity_path, &query).map(|vc| {
debug_assert!(vc.data_time.is_static());
vc
})
}

/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`],
/// assuming it is static.
///
/// This assumes that the row we get from the store only contains a single instance for this
/// component; it will return None and log a debug message otherwise.
///
/// This should only be used for "mono-components" such as `Transform` and `Tensor`.
///
/// This is a best-effort helper, it will merely log debug on failure.
pub fn query_static_component_quiet<C: Component>(
&self,
entity_path: &EntityPath,
) -> Option<VersionedComponent<C>> {
re_tracing::profile_function!();

let query = LatestAtQuery::latest(Timeline::default());
self.query_latest_component_quiet(entity_path, &query)
.map(|vc| {
debug_assert!(vc.data_time.is_static());
vc
})
}
}
use crate::DataStore;

// --- Write ---

Expand Down
37 changes: 25 additions & 12 deletions crates/re_data_store/tests/correctness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,24 @@ use re_types_core::Loggable as _;

// ---

fn query_latest_component<C: re_types_core::Component>(
store: &DataStore,
entity_path: &EntityPath,
query: &LatestAtQuery,
) -> Option<(TimeInt, RowId, C)> {
re_tracing::profile_function!();

let (data_time, row_id, cells) =
store.latest_at(query, entity_path, C::name(), &[C::name()])?;
let cell = cells.first()?.as_ref()?;

cell.try_to_native_mono::<C>()
.ok()?
.map(|c| (data_time, row_id, c))
}

// ---

#[test]
fn row_id_ordering_semantics() -> anyhow::Result<()> {
let entity_path: EntityPath = "some_entity".into();
Expand Down Expand Up @@ -60,10 +78,8 @@ fn row_id_ordering_semantics() -> anyhow::Result<()> {

{
let query = LatestAtQuery::new(timeline_frame, 11);
let got_point = store
.query_latest_component::<MyPoint>(&entity_path, &query)
.unwrap()
.value;
let (_, _, got_point) =
query_latest_component::<MyPoint>(&store, &entity_path, &query).unwrap();
similar_asserts::assert_eq!(point2, got_point);
}
}
Expand Down Expand Up @@ -129,10 +145,8 @@ fn row_id_ordering_semantics() -> anyhow::Result<()> {

{
let query = LatestAtQuery::new(timeline_frame, 11);
let got_point = store
.query_latest_component::<MyPoint>(&entity_path, &query)
.unwrap()
.value;
let (_, _, got_point) =
query_latest_component::<MyPoint>(&store, &entity_path, &query).unwrap();
similar_asserts::assert_eq!(point1, got_point);
}
}
Expand Down Expand Up @@ -170,10 +184,9 @@ fn row_id_ordering_semantics() -> anyhow::Result<()> {
store.insert_row(&row)?;

{
let got_point = store
.query_static_component::<MyPoint>(&entity_path)
.unwrap()
.value;
let query = LatestAtQuery::new(Timeline::new_temporal("doesnt_matter"), TimeInt::MAX);
let (_, _, got_point) =
query_latest_component::<MyPoint>(&store, &entity_path, &query).unwrap();
similar_asserts::assert_eq!(point1, got_point);
}
}
Expand Down
7 changes: 4 additions & 3 deletions crates/re_data_ui/src/annotation_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,12 @@ fn annotation_info(
query: &re_data_store::LatestAtQuery,
keypoint_id: KeypointId,
) -> Option<AnnotationInfo> {
// TODO(#5607): what should happen if the promise is still pending?
let class_id = ctx
.recording_store()
.query_latest_component::<re_types::components::ClassId>(entity_path, query)?;
.recording()
.latest_at_component::<re_types::components::ClassId>(entity_path, query)?;
let annotations = crate::annotations(ctx, query, entity_path);
let class = annotations.resolved_class_description(Some(*class_id));
let class = annotations.resolved_class_description(Some(class_id.value));
class.keypoint_map?.get(&keypoint_id).cloned()
}

Expand Down
12 changes: 7 additions & 5 deletions crates/re_data_ui/src/image.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ impl EntityDataUi for re_types::components::TensorData {
) {
re_tracing::profile_function!();

// TODO(#5607): what should happen if the promise is still pending?
let tensor_data_row_id = ctx
.recording_store()
.query_latest_component::<re_types::components::TensorData>(entity_path, query)
.map_or(RowId::ZERO, |tensor| tensor.row_id);
.recording()
.latest_at_component::<re_types::components::TensorData>(entity_path, query)
.map_or(RowId::ZERO, |tensor| tensor.index.1);

let decoded = ctx
.cache
Expand Down Expand Up @@ -92,8 +93,9 @@ pub fn tensor_ui(
let meaning = image_meaning_for_entity(entity_path, query, store);

let meter = if meaning == TensorDataMeaning::Depth {
ctx.recording_store()
.query_latest_component::<DepthMeter>(entity_path, query)
// TODO(#5607): what should happen if the promise is still pending?
ctx.recording()
.latest_at_component::<DepthMeter>(entity_path, query)
.map(|meter| meter.value.0)
} else {
None
Expand Down
14 changes: 7 additions & 7 deletions crates/re_data_ui/src/item_ui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,31 +162,31 @@ pub fn instance_path_icon(
/// timeline, then use the blueprint. Otherwise, use the recording.
// TODO(jleibs): Ideally this wouldn't be necessary and we could make the assessment
// directly from the entity_path.
pub fn guess_query_and_store_for_selected_entity<'a>(
pub fn guess_query_and_db_for_selected_entity<'a>(
ctx: &'a ViewerContext<'_>,
entity_path: &EntityPath,
) -> (re_data_store::LatestAtQuery, &'a re_data_store::DataStore) {
) -> (re_data_store::LatestAtQuery, &'a re_entity_db::EntityDb) {
if ctx.app_options.inspect_blueprint_timeline
&& ctx.store_context.blueprint.is_logged_entity(entity_path)
{
(
ctx.blueprint_cfg.time_ctrl.read().current_query(),
ctx.store_context.blueprint.store(),
ctx.store_context.blueprint,
)
} else {
(
ctx.rec_cfg.time_ctrl.read().current_query(),
ctx.recording_store(),
ctx.recording(),
)
}
}

pub fn guess_instance_path_icon(
ctx: &ViewerContext<'_>,
instance_path: &InstancePath,
) -> &'static icons::Icon {
let (query, store) = guess_query_and_store_for_selected_entity(ctx, &instance_path.entity_path);
instance_path_icon(&query.timeline(), store, instance_path)
) -> &'static re_ui::icons::Icon {
let (query, db) = guess_query_and_db_for_selected_entity(ctx, &instance_path.entity_path);
instance_path_icon(&query.timeline(), db.store(), instance_path)
}

/// Show an instance id and make it selectable.
Expand Down
2 changes: 2 additions & 0 deletions crates/re_entity_db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ re_log.workspace = true
re_log_encoding = { workspace = true, features = ["decoder"] }
re_log_types.workspace = true
re_query.workspace = true
re_query2.workspace = true
re_query_cache.workspace = true
re_query_cache2.workspace = true
re_smart_channel.workspace = true
re_tracing.workspace = true
re_types_core.workspace = true
Expand Down
Loading

0 comments on commit 0b300fb

Please sign in to comment.