Skip to content

Commit

Permalink
DataStore changelog 5: event-driven time histograms (#4208)
Browse files Browse the repository at this point in the history
This is mostly preliminary work for #4209, which makes this PR a bit
weird. Basically just trying to offload complexity from #4209.

`TimesPerTimeline` as well as `TimeHistogramPerTimeline` are now living
on their own and are maintained as `StoreView`s, i.e. they react to
changes to the `DataStore` rather than constructing alternate truths.

This is the first step towards turning the `EntityTree` giga-structure
into an event-driven view in the next PR.

---

`DataStore` changelog PR series:
- #4202
- #4203
- #4205
- #4206
- #4208
- #4209
  • Loading branch information
teh-cmc authored Nov 15, 2023
1 parent 412f6c0 commit edab782
Show file tree
Hide file tree
Showing 16 changed files with 348 additions and 288 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/re_arrow_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub mod test_util;
pub use self::arrow_util::ArrayExt;
pub use self::store::{DataStore, DataStoreConfig, StoreGeneration};
pub use self::store_event::{StoreDiff, StoreDiffKind, StoreEvent};
pub use self::store_gc::{Deleted, GarbageCollectionOptions, GarbageCollectionTarget};
pub use self::store_gc::{GarbageCollectionOptions, GarbageCollectionTarget};
pub use self::store_helpers::VersionedComponent;
pub use self::store_read::{LatestAtQuery, RangeQuery};
pub use self::store_stats::{DataStoreRowStats, DataStoreStats, EntityStats};
Expand Down
50 changes: 3 additions & 47 deletions crates/re_arrow_store/src/store_gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use std::collections::BTreeMap;

use ahash::{HashMap, HashSet};

use nohash_hasher::IntMap;
use re_log_types::{EntityPath, EntityPathHash, RowId, TimeInt, TimeRange, Timeline};
use re_log_types::{EntityPath, RowId, TimeInt, TimeRange, Timeline};
use re_types_core::{ComponentName, SizeBytes as _};

use crate::{
Expand Down Expand Up @@ -61,48 +60,6 @@ impl std::fmt::Display for GarbageCollectionTarget {
}
}

#[derive(Default)]
pub struct Deleted {
/// What rows where deleted?
pub row_ids: HashSet<RowId>,

/// What time points where deleted for each entity+timeline+component?
pub timeful: IntMap<EntityPathHash, IntMap<Timeline, IntMap<ComponentName, Vec<TimeInt>>>>,

/// For each entity+component, how many timeless entries were deleted?
pub timeless: IntMap<EntityPathHash, IntMap<ComponentName, u64>>,
}

impl Deleted {
pub fn new(store_events: &[StoreEvent]) -> Self {
let mut this = Deleted {
row_ids: store_events.iter().map(|event| event.row_id).collect(),
timeful: Default::default(),
timeless: Default::default(),
};

for event in store_events {
if event.is_timeless() {
let per_component = this.timeless.entry(event.entity_path.hash()).or_default();
for component_name in event.cells.keys() {
*per_component.entry(*component_name).or_default() +=
event.delta().unsigned_abs();
}
} else {
for (&timeline, &time) in &event.timepoint {
let per_timeline = this.timeful.entry(event.entity_path.hash()).or_default();
let per_component = per_timeline.entry(timeline).or_default();
for component_name in event.cells.keys() {
per_component.entry(*component_name).or_default().push(time);
}
}
}
}

this
}
}

impl DataStore {
/// Triggers a garbage collection according to the desired `target`.
///
Expand Down Expand Up @@ -135,7 +92,7 @@ impl DataStore {
/// points in time may provide different results pre- and post- GC.
//
// TODO(#1823): Workload specific optimizations.
pub fn gc(&mut self, options: GarbageCollectionOptions) -> (Deleted, DataStoreStats) {
pub fn gc(&mut self, options: GarbageCollectionOptions) -> (Vec<StoreEvent>, DataStoreStats) {
re_tracing::profile_function!();

self.gc_id += 1;
Expand Down Expand Up @@ -232,8 +189,7 @@ impl DataStore {
Self::on_events(&events);
}

// TODO(cmc): Temporary, we'll return raw events soon, but need to rework EntityTree first.
(Deleted::new(&events), stats_diff)
(events, stats_diff)
}

/// Tries to drop _at least_ `num_bytes_to_drop` bytes of data from the store.
Expand Down
11 changes: 5 additions & 6 deletions crates/re_arrow_store/tests/correctness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,10 +485,9 @@ fn gc_correct() {

let stats = DataStoreStats::from_store(&store);

let (deleted, stats_diff) = store.gc(GarbageCollectionOptions::gc_everything());
let (store_events, stats_diff) = store.gc(GarbageCollectionOptions::gc_everything());
let stats_diff = stats_diff + stats_empty; // account for fixed overhead

assert_eq!(deleted.row_ids.len() as u64, stats.total.num_rows);
assert_eq!(
stats.metadata_registry.num_rows,
stats_diff.metadata_registry.num_rows
Expand All @@ -501,12 +500,12 @@ fn gc_correct() {

sanity_unwrap(&mut store);
check_still_readable(&store);
for row_id in &deleted.row_ids {
assert!(store.get_msg_metadata(row_id).is_none());
for event in store_events {
assert!(store.get_msg_metadata(&event.row_id).is_none());
}

let (deleted, stats_diff) = store.gc(GarbageCollectionOptions::gc_everything());
assert!(deleted.row_ids.is_empty());
let (store_events, stats_diff) = store.gc(GarbageCollectionOptions::gc_everything());
assert!(store_events.is_empty());
assert_eq!(DataStoreStats::default(), stats_diff);

sanity_unwrap(&mut store);
Expand Down
6 changes: 3 additions & 3 deletions crates/re_arrow_store/tests/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -926,14 +926,14 @@ fn gc_impl(store: &mut DataStore) {

let stats = DataStoreStats::from_store(store);

let (deleted, stats_diff) = store.gc(GarbageCollectionOptions {
let (store_events, stats_diff) = store.gc(GarbageCollectionOptions {
target: GarbageCollectionTarget::DropAtLeastFraction(1.0 / 3.0),
gc_timeless: false,
protect_latest: 0,
purge_empty_tables: false,
});
for row_id in &deleted.row_ids {
assert!(store.get_msg_metadata(row_id).is_none());
for event in store_events {
assert!(store.get_msg_metadata(&event.row_id).is_none());
}

// NOTE: only temporal data and row metadata get purged!
Expand Down
1 change: 1 addition & 0 deletions crates/re_data_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ re_smart_channel.workspace = true
re_tracing.workspace = true
re_types_core.workspace = true

ahash.workspace = true
document-features.workspace = true
getrandom.workspace = true
itertools.workspace = true
Expand Down
Loading

0 comments on commit edab782

Please sign in to comment.