Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataStore changelog 3: introduce StoreViews #4205

Merged
merged 6 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/re_arrow_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ document-features.workspace = true
indent.workspace = true
itertools = { workspace = true }
nohash-hasher.workspace = true
once_cell.workspace = true
parking_lot.workspace = true
smallvec.workspace = true
thiserror.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions crates/re_arrow_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod store_helpers;
mod store_read;
mod store_sanity;
mod store_stats;
mod store_view;
mod store_write;

#[cfg(feature = "polars")]
Expand All @@ -43,6 +44,7 @@ pub use self::store_gc::{Deleted, GarbageCollectionOptions, GarbageCollectionTar
pub use self::store_helpers::VersionedComponent;
pub use self::store_read::{LatestAtQuery, RangeQuery};
pub use self::store_stats::{DataStoreRowStats, DataStoreStats, EntityStats};
pub use self::store_view::{StoreView, StoreViewHandle};
pub use self::store_write::{WriteError, WriteResult};

pub(crate) use self::store::{
Expand Down
8 changes: 5 additions & 3 deletions crates/re_arrow_store/src/store_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::StoreGeneration;

// Used all over in docstrings.
#[allow(unused_imports)]
use crate::DataStore;
use crate::{DataStore, StoreView};

// ---

Expand All @@ -18,6 +18,8 @@ use crate::DataStore;
///
/// Methods that mutate the [`DataStore`], such as [`DataStore::insert_row`] and [`DataStore::gc`],
/// return [`StoreEvent`]s that describe the changes.
/// You can also register your own [`StoreView`] in order to be notified of changes as soon as they
/// happen.
///
/// Refer to field-level documentation for more details and check out [`StoreDiff`] for a precise
/// definition of what an event involves.
Expand Down Expand Up @@ -239,11 +241,11 @@ mod tests {
};
use re_types_core::{components::InstanceKey, Loggable as _};

use crate::{DataStore, GarbageCollectionOptions};
use crate::{DataStore, GarbageCollectionOptions, StoreView, StoreViewHandle};

use super::*;

/// A simple store view for test purposes that keeps track of the quantity of data available
/// A simple store subscriber for test purposes that keeps track of the quantity of data available
/// in the store a the lowest level of detail.
///
/// The counts represent numbers of rows: e.g. how many unique rows contain this entity path?
Expand Down
12 changes: 8 additions & 4 deletions crates/re_arrow_store/src/store_gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,14 @@ impl DataStore {
})
.collect();

if cfg!(debug_assertions) {
let any_event_other_than_deletion =
events.iter().any(|e| e.kind != StoreDiffKind::Deletion);
assert!(!any_event_other_than_deletion);
{
if cfg!(debug_assertions) {
let any_event_other_than_deletion =
events.iter().any(|e| e.kind != StoreDiffKind::Deletion);
assert!(!any_event_other_than_deletion);
}

Self::on_events(&events);
}

// TODO(cmc): Temporary, we'll return raw events soon, but need to rework EntityTree first.
Expand Down
3 changes: 3 additions & 0 deletions crates/re_arrow_store/src/store_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use crate::{

// ---

// NOTE: Not implemented as a StoreView because it also measures implementation details of the
// store (buckets etc), while StoreEvents work at a data-model level.

#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd)]
pub struct DataStoreRowStats {
pub num_rows: u64,
Expand Down
254 changes: 254 additions & 0 deletions crates/re_arrow_store/src/store_view.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
use parking_lot::RwLock;

use crate::{DataStore, StoreEvent};

// ---

// TODO(cmc): Not sure why I need the extra Box here, RwLock should be `?Sized`.
type SharedStoreView = RwLock<Box<dyn StoreView>>;
Comment on lines +7 to +8
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looked it up and it is indeed https://docs.rs/lock_api/latest/lock_api/struct.RwLock.html
So it gotta be something else that's wrong here if you say we can't put the StoreView directly 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, looked it up too, and ended up with the same emotion: 🤔


/// A [`StoreView`] subscribes to atomic changes from all [`DataStore`]s through [`StoreEvent`]s.
///
/// [`StoreView`]s can be used to build both secondary indices and trigger systems.
//
// TODO(#4204): StoreView should require SizeBytes so they can be part of memstats.
pub trait StoreView: std::any::Any + Send + Sync {
/// Arbitrary name for the view.
///
/// Does not need to be unique.
fn name(&self) -> String;
Comment on lines +16 to +19
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the intended use? I'd guess it's a display name used for debugging?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if this isn't just always a &'static str, but would get annoying eventually

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the intended use? I'd guess it's a display name used for debugging?

Yeah, just a good habit for things to be named IME. Especially plugin-y stuff like this that ends up living all over the place.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if this isn't just always a &'static str, but would get annoying eventually

I would much prefer not forcing people to just through hoops if they need to format!() something (and we can turn to a Cow later if the string is a performance concern (highly unlikely)).


/// Workaround for downcasting support, simply return `self`:
/// ```ignore
/// fn as_any(&self) -> &dyn std::any::Any {
/// self
/// }
/// ```
fn as_any(&self) -> &dyn std::any::Any;

/// Workaround for downcasting support, simply return `self`:
/// ```ignore
/// fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
/// self
/// }
/// ```
fn as_any_mut(&mut self) -> &mut dyn std::any::Any;

/// The core of this trait: get notified of changes happening in all [`DataStore`]s.
///
/// This will be called automatically by the [`DataStore`] itself if the view has been
/// registered: [`DataStore::register_view`].
/// Or you might want to feed it [`StoreEvent`]s manually, depending on your use case.
///
/// ## Example
///
/// ```ignore
/// fn on_events(&mut self, events: &[StoreEvent]) {
/// use re_arrow_store::StoreDiffKind;
/// for event in events {
/// match event.kind {
/// StoreDiffKind::Addition => println!("Row added: {}", event.row_id),
/// StoreDiffKind::Deletion => println!("Row removed: {}", event.row_id),
/// }
/// }
/// }
/// ```
fn on_events(&mut self, events: &[StoreEvent]);
}

/// All registered [`StoreView`]s.
static VIEWS: once_cell::sync::Lazy<RwLock<Vec<SharedStoreView>>> =
once_cell::sync::Lazy::new(|| RwLock::new(Vec::new()));

#[derive(Debug, Clone, Copy)]
pub struct StoreViewHandle(u32);

impl DataStore {
/// Registers a [`StoreView`] so it gets automatically notified when data gets added and/or
/// removed to/from a [`DataStore`].
///
/// Refer to [`StoreEvent`]'s documentation for more information about these events.
///
/// ## Scope
///
/// Registered [`StoreView`]s are global scope: they get notified of all events from all
/// existing [`DataStore`]s, including [`DataStore`]s created after the view was registered.
///
/// Use [`StoreEvent::store_id`] to identify the source of an event.
///
/// ## Late registration
///
/// Views must be registered before a store gets created to guarantee that no events were
/// missed.
///
/// [`StoreEvent::event_id`] can be used to identify missing events.
///
/// ## Ordering
///
/// The order in which registered views are notified is undefined and will likely become
/// concurrent in the future.
///
/// If you need a specific order across multiple views, embed them into an orchestrating view.
//
// TODO(cmc): send a compacted snapshot to late registerers for bootstrapping
pub fn register_view(view: Box<dyn StoreView>) -> StoreViewHandle {
let mut views = VIEWS.write();
views.push(RwLock::new(view));
StoreViewHandle(views.len() as u32 - 1)
}

/// Passes a reference to the downcasted view to the given callback.
///
/// Returns `None` if the view doesn't exist or downcasting failed.
pub fn with_view<V: StoreView, T, F: FnMut(&V) -> T>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't understand why it's called with_ it's not like I get a DataStore with some handle. More like view or get_view?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is because it doesn't return the view, but rather takes a closure that is executed with the view as an argument. As in "With this view, run this function". That said, I also find with_ confusing here. Maybe for_?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mostly based this on the thread_local API from the stdlib: https://doc.rust-lang.org/std/thread/struct.LocalKey.html#examples

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll keep it as is for now as it feels somewhat consistent with std at least... definitely feel free to change it later if you come up with anything better.

StoreViewHandle(handle): StoreViewHandle,
mut f: F,
) -> Option<T> {
let views = VIEWS.read();
views.get(handle as usize).and_then(|view| {
let view = view.read();
view.as_any().downcast_ref::<V>().map(&mut f)
})
}

/// Passes a mutable reference to the downcasted view to the given callback.
///
/// Returns `None` if the view doesn't exist or downcasting failed.
pub fn with_view_mut<V: StoreView, T, F: FnMut(&mut V) -> T>(
StoreViewHandle(handle): StoreViewHandle,
mut f: F,
) -> Option<T> {
let views = VIEWS.read();
views.get(handle as usize).and_then(|view| {
let mut view = view.write();
view.as_any_mut().downcast_mut::<V>().map(&mut f)
})
}

/// Called by [`DataStore`]'s mutating methods to notify view subscribers of upcoming events.
pub(crate) fn on_events(events: &[StoreEvent]) {
let views = VIEWS.read();
// TODO(cmc): might want to parallelize at some point.
for view in views.iter() {
view.write().on_events(events);
}
}
}

#[cfg(tests)]
mod tests {
use std::collections::BTreeMap;

use re_log_types::{
example_components::{MyColor, MyPoint, MyPoints},
DataRow, DataTable, EntityPath, RowId, TableId, Time, TimePoint, Timeline,
};
use re_types_core::{components::InstanceKey, Loggable as _};

use crate::{DataStore, GarbageCollectionOptions, StoreView, StoreViewHandle};

use super::*;

/// A simple [`StoreView`] for test purposes that just accumulates [`StoreEvent`]s.
#[derive(Default, Debug)]
struct AllEvents {
events: Vec<StoreEvent>,
}

impl StoreView for AllEvents {
fn name(&self) -> String {
"rerun.testing.store_views.AllEvents".into()
}

fn as_any(&self) -> &dyn std::any::Any {
self
}

fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}

fn on_events(&mut self, events: &[StoreEvent]) {
self.events.extend(events.to_owned());
}
}

#[test]
fn store_view() -> anyhow::Result<()> {
let mut store1 = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);
let mut store2 = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);

let mut expected_events = Vec::new();

let view_handle = DataStore::register_view(Box::<AllEvents>::default());

let timeline_frame = Timeline::new_sequence("frame");
let timeline_other = Timeline::new_temporal("other");
let timeline_yet_another = Timeline::new_sequence("yet_another");

let row = DataRow::from_component_batches(
RowId::random(),
TimePoint::from_iter([
(timeline_frame, 42.into()), //
(timeline_other, 666.into()), //
(timeline_yet_another, 1.into()), //
]),
"entity_a".into(),
[&InstanceKey::from_iter(0..10) as _],
)?;

expected_events.extend(store1.insert_row(&row));

let row = {
let num_instances = 3;
let points: Vec<_> = (0..num_instances)
.map(|i| MyPoint::new(0.0, i as f32))
.collect();
let colors = vec![MyColor::from(0xFF0000FF)];
DataRow::from_component_batches(
RowId::random(),
TimePoint::from_iter([
(timeline_frame, 42.into()), //
(timeline_yet_another, 1.into()), //
]),
"entity_b".into(),
[&points as _, &colors as _],
)?
};

expected_events.extend(store2.insert_row(&row));

let row = {
let num_instances = 6;
let colors = vec![MyColor::from(0x00DD00FF); num_instances];
DataRow::from_component_batches(
RowId::random(),
TimePoint::timeless(),
"entity_b".into(),
[
&InstanceKey::from_iter(0..num_instances as _) as _,
&colors as _,
],
)?
};

expected_events.extend(store1.insert_row(&row));

expected_events.extend(store1.gc(GarbageCollectionOptions::gc_everything()).0);
expected_events.extend(store2.gc(GarbageCollectionOptions::gc_everything()).0);

DataStore::with_view::<AllEvents, _, _>(view_handle, |got| {
similar_asserts::assert_eq!(expected_events, got.events);
});

Ok(())
}
}
14 changes: 13 additions & 1 deletion crates/re_arrow_store/src/store_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use re_types_core::{

use crate::{
DataStore, DataStoreConfig, IndexedBucket, IndexedBucketInner, IndexedTable, MetadataRegistry,
PersistentIndexedTable, StoreDiff, StoreEvent,
PersistentIndexedTable, StoreDiff, StoreDiffKind, StoreEvent,
};

// --- Data store ---
Expand Down Expand Up @@ -202,6 +202,18 @@ impl DataStore {
diff,
};

{
let events = &[event.clone()];

if cfg!(debug_assertions) {
let any_event_other_than_addition =
events.iter().any(|e| e.kind != StoreDiffKind::Addition);
assert!(!any_event_other_than_addition);
}

Self::on_events(events);
}

Ok(event)
}

Expand Down
Loading