Skip to content

Commit

Permalink
DataStore changelog 6: event-driven EntityTree (#4209)
Browse files Browse the repository at this point in the history
Turns the `EntityTree` giga-datastructure into a `StoreView`, meaning it
now reacts to `StoreEvent`s rather than creating alternate truths.

This introduces the notion of cascading side-effects, and more
specifically `ClearCascade`s.
When the `EntityTree` reacts to changes in the store, this might cause
cascading effects (e.g. pending clears), that in turn need to write back
to the store, which in turn sends more events to react to!
The cycle is guaranteed finite because "clears don't get cleared"!

Cascading side-effects have an interesting requirement: they need to log
their cascaded data using a `RowId` _similar_ to the one used in the
original event that caused the cascade (so they get GC'd at roughly the
same pace).
"Similar" in this cases means that their `TUID` shares the same
timestamp and that the new `RowId` is strictly greater than the old one.

`PathOp` has finally been annihilated.

According to our new "Clears" & "Time Histograms" test suites, this
behaves exactly like the `main` branch.


---

`DataStore` changelog PR series:
- #4202
- #4203
- #4205
- #4206
- #4208
- #4209
  • Loading branch information
teh-cmc authored Nov 15, 2023
1 parent edab782 commit 176c08b
Show file tree
Hide file tree
Showing 12 changed files with 464 additions and 449 deletions.
9 changes: 5 additions & 4 deletions crates/re_arrow_store/src/store_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,14 @@ impl DataStore {
}
}

let mut diff = StoreDiff::addition(*row_id, ent_path.clone())
let diff = StoreDiff::addition(*row_id, ent_path.clone())
.at_timepoint(timepoint.clone())
.with_cells(cells.iter().cloned());

if let Some(cell) = generated_cluster_cell {
diff = diff.with_cells([cell]);
}
// TODO(#4220): should we fire for auto-generated data?
// if let Some(cell) = generated_cluster_cell {
// diff = diff.with_cells([cell]);
// }

let event = StoreEvent {
store_id: self.id.clone(),
Expand Down
512 changes: 306 additions & 206 deletions crates/re_data_store/src/entity_tree.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion crates/re_data_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub use self::time_histogram_per_timeline::{TimeHistogram, TimeHistogramPerTimel
pub use self::times_per_timeline::{TimeCounts, TimesPerTimeline};
pub use self::versioned_instance_path::{VersionedInstancePath, VersionedInstancePathHash};

pub(crate) use self::entity_tree::CompactedStoreEvents;
pub(crate) use self::entity_tree::{ClearCascade, CompactedStoreEvents};

use re_log_types::DataTableError;
pub use re_log_types::{EntityPath, EntityPathPart, Index, TimeInt, Timeline};
Expand Down
262 changes: 104 additions & 158 deletions crates/re_data_store/src/store_db.rs

Large diffs are not rendered by default.

23 changes: 9 additions & 14 deletions crates/re_data_store/src/time_histogram_per_timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ pub struct TimeHistogramPerTimeline {
/// When do we have data? Ignores timeless.
times: BTreeMap<Timeline, TimeHistogram>,

// TODO(cmc): pub(crate) is temporary while we turn StoreDb/EntityDb/EntityTree into event subscribers.
/// Extra book-keeping used to seed any timelines that include timeless msgs.
pub(crate) num_timeless_messages: u64,
num_timeless_messages: u64,
}

impl TimeHistogramPerTimeline {
Expand All @@ -42,46 +41,42 @@ impl TimeHistogramPerTimeline {
self.times.iter()
}

// TODO(cmc): temporary while we turn StoreDb/EntityDb/EntityTree into event subscribers.
#[inline]
pub fn iter_mut(&mut self) -> impl ExactSizeIterator<Item = (&Timeline, &mut TimeHistogram)> {
self.times.iter_mut()
}

#[inline]
pub fn num_timeless_messages(&self) -> u64 {
self.num_timeless_messages
}

pub fn add(&mut self, timepoint: &TimePoint) {
pub fn add(&mut self, timepoint: &TimePoint, n: u32) {
// If the `timepoint` is timeless…
if timepoint.is_timeless() {
self.num_timeless_messages += 1;
self.num_timeless_messages += n as u64;
} else {
for (timeline, time_value) in timepoint.iter() {
self.times
.entry(*timeline)
.or_default()
.increment(time_value.as_i64(), 1);
.increment(time_value.as_i64(), n);
}
}
}

pub fn remove(&mut self, timepoint: &TimePoint) {
pub fn remove(&mut self, timepoint: &TimePoint, n: u32) {
// If the `timepoint` is timeless…
if timepoint.is_timeless() {
self.num_timeless_messages -= 1;
self.num_timeless_messages -= n as u64;
} else {
for (timeline, time_value) in timepoint.iter() {
self.times
.entry(*timeline)
.or_default()
.decrement(time_value.as_i64(), 1);
.decrement(time_value.as_i64(), n);
}
}
}
}

// NOTE: This is only to let people know that this is in fact a [`StoreView`], so they A) don't try
// to implement it on their own and B) don't try to register it.
impl StoreView for TimeHistogramPerTimeline {
#[inline]
fn name(&self) -> String {
Expand Down
7 changes: 0 additions & 7 deletions crates/re_data_store/src/times_per_timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,6 @@ impl std::ops::Deref for TimesPerTimeline {
}
}

// TODO(cmc): temporary while we turn StoreDb/EntityDb/EntityTree into event subscribers.
impl std::ops::DerefMut for TimesPerTimeline {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

impl TimesPerTimeline {
#[inline]
pub fn timelines(&self) -> impl ExactSizeIterator<Item = &Timeline> {
Expand Down
35 changes: 34 additions & 1 deletion crates/re_data_store/tests/clear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@ use re_log_types::{
example_components::{MyColor, MyPoint},
DataRow, EntityPath, RowId, StoreId, TimePoint, Timeline,
};
use re_types_core::{archetypes::Clear, components::InstanceKey, AsComponents};
use re_types_core::{
archetypes::Clear,
components::{ClearIsRecursive, InstanceKey},
AsComponents,
};

/// Complete test suite for the clear & pending clear paths.
#[test]
fn clears() -> anyhow::Result<()> {
init_logs();

let mut db = StoreDb::new(StoreId::random(re_log_types::StoreKind::Recording));

let timeline_frame = Timeline::new_sequence("frame");
Expand Down Expand Up @@ -150,6 +156,13 @@ fn clears() -> anyhow::Result<()> {
.store()
.query_latest_component::<MyColor>(&entity_path_parent, &query)
.is_none());
// the `Clear` component itself doesn't get cleared!
let got_clear = db
.store()
.query_latest_component::<ClearIsRecursive>(&entity_path_parent, &query)
.unwrap()
.value;
similar_asserts::assert_eq!(clear.is_recursive, got_clear);

// child1
assert!(db
Expand Down Expand Up @@ -197,6 +210,13 @@ fn clears() -> anyhow::Result<()> {
.store()
.query_latest_component::<MyColor>(&entity_path_parent, &query)
.is_none());
// the `Clear` component itself doesn't get cleared!
let got_clear = db
.store()
.query_latest_component::<ClearIsRecursive>(&entity_path_parent, &query)
.unwrap()
.value;
similar_asserts::assert_eq!(clear.is_recursive, got_clear);

// child1
assert!(db
Expand Down Expand Up @@ -413,3 +433,16 @@ fn clears() -> anyhow::Result<()> {

Ok(())
}

// ---

pub fn init_logs() {
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;

static INIT: AtomicBool = AtomicBool::new(false);

if INIT.compare_exchange(false, true, SeqCst, SeqCst).is_ok() {
re_log::setup_native_logging();
}
}
7 changes: 1 addition & 6 deletions crates/re_data_store/tests/time_histograms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,14 +487,9 @@ fn time_histograms() -> anyhow::Result<()> {
&timeline_frame,
Some(&[
(RangeI64::new(42, 42), 5),
// TODO(cmc): This is wrong, it should be `4`.
//
// We're clearing the parent's `InstanceKey` as well as the grandchild's
// `MyPoint`, `MyColor` and `InstanceKey`. That's four.
//
// I have no idea where the extra increment comes from, but it doesn't
// really matter, the new event-driven EntityTree fixes it.
(RangeI64::new(1000, 1000), 5),
(RangeI64::new(1000, 1000), 4),
(RangeI64::new(1234, 1234), 3),
]),
),
Expand Down
21 changes: 1 addition & 20 deletions crates/re_data_ui/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use itertools::Itertools;

use re_log_types::{DataCell, EntityPath, PathOp, TimePoint};
use re_log_types::{DataCell, EntityPath, TimePoint};
use re_types::ComponentName;
use re_viewer_context::{UiVerbosity, ViewerContext};

Expand Down Expand Up @@ -177,25 +177,6 @@ fn format_cell(cell: &DataCell) -> String {
)
}

impl DataUi for PathOp {
fn data_ui(
&self,
_ctx: &mut ViewerContext<'_>,
ui: &mut egui::Ui,
_verbosity: UiVerbosity,
_query: &re_arrow_store::LatestAtQuery,
) {
match self {
PathOp::ClearComponents(entity_path) => {
ui.label(format!("ClearComponents: {entity_path}"))
}
PathOp::ClearRecursive(entity_path) => {
ui.label(format!("ClearRecursive: {entity_path}"))
}
};
}
}

// ---------------------------------------------------------------------------

pub fn annotations(
Expand Down
29 changes: 0 additions & 29 deletions crates/re_log_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,35 +360,6 @@ impl std::fmt::Display for StoreSource {
}
}

// ----------------------------------------------------------------------------

/// Operation to perform on an [`EntityPath`], e.g. clearing all components.
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub enum PathOp {
/// Clear all the components stored at an [`EntityPath`]
ClearComponents(EntityPath),

/// Clear all the components of an `[EntityPath]` and any descendants.
ClearRecursive(EntityPath),
}

impl PathOp {
pub fn clear(recursive: bool, entity_path: EntityPath) -> Self {
if recursive {
PathOp::ClearRecursive(entity_path)
} else {
PathOp::ClearComponents(entity_path)
}
}

pub fn entity_path(&self) -> &EntityPath {
match &self {
PathOp::ClearComponents(path) | PathOp::ClearRecursive(path) => path,
}
}
}

// ---

/// Build a ([`Timeline`], [`TimeInt`]) tuple from `log_time` suitable for inserting in a [`TimePoint`].
Expand Down
2 changes: 1 addition & 1 deletion crates/re_log_types/src/path/component_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::path::EntityPath;
/// A [`EntityPath`] plus a [`ComponentName`].
///
/// Example: `camera / "left" / points / #42`.`color`
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub struct ComponentPath {
/// `camera / "left" / points / #42`
Expand Down
4 changes: 2 additions & 2 deletions crates/re_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ pub mod sink {
/// Things directly related to logging.
pub mod log {
pub use re_log_types::{
DataCell, DataRow, DataTable, DataTableBatcher, DataTableBatcherConfig, LogMsg, PathOp,
RowId, TableId,
DataCell, DataRow, DataTable, DataTableBatcher, DataTableBatcherConfig, LogMsg, RowId,
TableId,
};
}

Expand Down

1 comment on commit 176c08b

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'Rust Benchmark'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 1.25.

Benchmark suite Current: 176c08b Previous: edab782 Ratio
mono_points_arrow/generate_message_bundles 28918156 ns/iter (± 1326427) 22423690 ns/iter (± 1827673) 1.29
mono_points_arrow_batched/generate_message_bundles 19524748 ns/iter (± 1666811) 14457135 ns/iter (± 855458) 1.35

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.