Skip to content

Commit

Permalink
Fix post-GC purging of streams view time histogram (#3364)
Browse files Browse the repository at this point in the history
### What
* Closes #2517

The datastore is not the only place we store data. For each node in the
entity tree we store a _time histogram_ of where we have data. This was
never properly purged post GC - a very rough heuristic was instead used
(throw away everything up to the oldest time in the store - which will
be an even worse heursistic after
#3357).

With this PR, the store gc will book-keep exactly what was deleted, and
it will be properly purged from all secondary indices.

The resulting code is a bit complicated, because the store has no idea
about the hierarchical nature of the entity paths, but we store the time
histograms (and other book-keeping) for each node. That is, logging to
`/foo/bar` we will note the data on `/`, `/foo` and `/foo/bar`, but the
data will only be registered in the store for `/foo/bar`.

I also fixed a bunch of other smaller things that came up.

Best reviewed commit-by-commit.

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested [demo.rerun.io](https://demo.rerun.io/pr/3364) (if
applicable)

- [PR Build Summary](https://build.rerun.io/pr/3364)
- [Docs
preview](https://rerun.io/preview/18cbc53fe96798cbe45ab84ddeb66aa183253e12/docs)
<!--DOCS-PREVIEW-->
- [Examples
preview](https://rerun.io/preview/18cbc53fe96798cbe45ab84ddeb66aa183253e12/examples)
<!--EXAMPLES-PREVIEW-->
- [Recent benchmark results](https://ref.rerun.io/dev/bench/)
- [Wasm size tracking](https://ref.rerun.io/dev/sizes/)
  • Loading branch information
emilk authored Sep 20, 2023
1 parent fb236a6 commit a91f415
Show file tree
Hide file tree
Showing 14 changed files with 518 additions and 238 deletions.
2 changes: 1 addition & 1 deletion crates/re_arrow_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub mod test_util;

pub use self::arrow_util::ArrayExt;
pub use self::store::{DataStore, DataStoreConfig, StoreGeneration};
pub use self::store_gc::{GarbageCollectionOptions, GarbageCollectionTarget};
pub use self::store_gc::{Deleted, GarbageCollectionOptions, GarbageCollectionTarget};
pub use self::store_read::{LatestAtQuery, RangeQuery};
pub use self::store_stats::{DataStoreRowStats, DataStoreStats, EntityStats};
pub use self::store_write::{WriteError, WriteResult};
Expand Down
98 changes: 69 additions & 29 deletions crates/re_arrow_store/src/store_gc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use ahash::{HashMap, HashSet};

use re_log_types::{RowId, SizeBytes as _, TimeInt, TimeRange};
use nohash_hasher::IntMap;
use re_log_types::{EntityPathHash, RowId, SizeBytes as _, TimeInt, TimeRange, Timeline};
use re_types::ComponentName;

use crate::{
Expand Down Expand Up @@ -58,6 +59,18 @@ impl std::fmt::Display for GarbageCollectionTarget {
}
}

#[derive(Default)]
pub struct Deleted {
/// What rows where deleted?
pub row_ids: HashSet<RowId>,

/// What time points where deleted for each entity+timeline+component?
pub timeful: IntMap<EntityPathHash, IntMap<Timeline, IntMap<ComponentName, Vec<TimeInt>>>>,

/// For each entity+component, how many timeless entries were deleted?
pub timeless: IntMap<EntityPathHash, IntMap<ComponentName, u64>>,
}

impl DataStore {
/// Triggers a garbage collection according to the desired `target`.
///
Expand Down Expand Up @@ -102,7 +115,7 @@ impl DataStore {
// when purging data.
//
// TODO(#1823): Workload specific optimizations.
pub fn gc(&mut self, options: GarbageCollectionOptions) -> (Vec<RowId>, DataStoreStats) {
pub fn gc(&mut self, options: GarbageCollectionOptions) -> (Deleted, DataStoreStats) {
re_tracing::profile_function!();

self.gc_id += 1;
Expand All @@ -114,7 +127,7 @@ impl DataStore {

let protected_rows = self.find_all_protected_rows(options.protect_latest);

let mut row_ids = match options.target {
let mut deleted = match options.target {
GarbageCollectionTarget::DropAtLeastFraction(p) => {
assert!((0.0..=1.0).contains(&p));

Expand Down Expand Up @@ -153,7 +166,7 @@ impl DataStore {
};

if options.purge_empty_tables {
row_ids.extend(self.purge_empty_tables());
deleted.row_ids.extend(self.purge_empty_tables());
}

#[cfg(debug_assertions)]
Expand All @@ -177,7 +190,7 @@ impl DataStore {

let stats_diff = stats_before - stats_after;

(row_ids, stats_diff)
(deleted, stats_diff)
}

/// Tries to drop _at least_ `num_bytes_to_drop` bytes of data from the store.
Expand All @@ -194,23 +207,20 @@ impl DataStore {
mut num_bytes_to_drop: f64,
include_timeless: bool,
protected_rows: &HashSet<RowId>,
) -> Vec<RowId> {
) -> Deleted {
re_tracing::profile_function!();

let mut row_ids = Vec::new();
let mut deleted = Deleted::default();

// The algorithm is straightforward:
// 1. Find the the oldest `RowId` that is not protected
// 2. Find all tables that potentially hold data associated with that `RowId`
// 3. Drop the associated row and account for the space we got back

let mut candidate_rows = self.metadata_registry.registry.iter();

while num_bytes_to_drop > 0.0 {
// Try to get the next candidate
let Some((row_id, timepoint)) = candidate_rows.next() else {
for (row_id, timepoint) in &self.metadata_registry.registry {
if num_bytes_to_drop <= 0.0 {
break;
};
}

if protected_rows.contains(row_id) {
continue;
Expand All @@ -221,32 +231,40 @@ impl DataStore {
self.metadata_registry.heap_size_bytes -= metadata_dropped_size_bytes;
num_bytes_to_drop -= metadata_dropped_size_bytes as f64;

row_ids.push(*row_id);
deleted.row_ids.insert(*row_id);

// find all tables that could possibly contain this `RowId`
let temporal_tables = self.tables.iter_mut().filter_map(|((timeline, _), table)| {
timepoint.get(timeline).map(|time| (*time, table))
});

for (time, table) in temporal_tables {
num_bytes_to_drop -= table.try_drop_row(*row_id, time.as_i64()) as f64;
for ((timeline, ent_path_hash), table) in &mut self.tables {
if let Some(time) = timepoint.get(timeline) {
let deleted_comps = deleted
.timeful
.entry(*ent_path_hash)
.or_default()
.entry(*timeline)
.or_default();

num_bytes_to_drop -=
table.try_drop_row(*row_id, time.as_i64(), deleted_comps) as f64;
}
}

// TODO(jleibs): This is a worst-case removal-order. Would be nice to collect all the rows
// first and then remove them in one pass.
if timepoint.is_timeless() && include_timeless {
for table in self.timeless_tables.values_mut() {
num_bytes_to_drop -= table.try_drop_row(*row_id) as f64;
for (ent_path_hash, table) in &mut self.timeless_tables {
let deleted_comps = deleted.timeless.entry(*ent_path_hash).or_default();
num_bytes_to_drop -= table.try_drop_row(*row_id, deleted_comps) as f64;
}
}
}

// Purge the removed rows from the metadata_registry
for row_id in &row_ids {
for row_id in &deleted.row_ids {
self.metadata_registry.remove(row_id);
}

row_ids
deleted
}

/// For each `EntityPath`, `Timeline`, `Component` find the N latest [`RowId`]s.
Expand Down Expand Up @@ -403,15 +421,21 @@ impl IndexedTable {
/// specified `time`.
///
/// Returns how many bytes were actually dropped, or zero if the row wasn't found.
fn try_drop_row(&mut self, row_id: RowId, time: i64) -> u64 {
fn try_drop_row(
&mut self,
row_id: RowId,
time: i64,
deleted_comps: &mut IntMap<ComponentName, Vec<TimeInt>>,
) -> u64 {
re_tracing::profile_function!();
let table_has_more_than_one_bucket = self.buckets.len() > 1;

let (bucket_key, bucket) = self.find_bucket_mut(time.into());
let bucket_num_bytes = bucket.total_size_bytes();

let mut dropped_num_bytes = {
let inner = &mut *bucket.inner.write();
inner.try_drop_row(row_id, time)
inner.try_drop_row(row_id, time, deleted_comps)
};

// NOTE: We always need to keep at least one bucket alive, otherwise we have
Expand Down Expand Up @@ -447,7 +471,12 @@ impl IndexedBucketInner {
/// specified `time`.
///
/// Returns how many bytes were actually dropped, or zero if the row wasn't found.
fn try_drop_row(&mut self, row_id: RowId, time: i64) -> u64 {
fn try_drop_row(
&mut self,
row_id: RowId,
time: i64,
deleted_comps: &mut IntMap<ComponentName, Vec<TimeInt>>,
) -> u64 {
self.sort();

let IndexedBucketInner {
Expand Down Expand Up @@ -506,8 +535,12 @@ impl IndexedBucketInner {
dropped_num_bytes += col_num_instances.swap_remove(row_index).total_size_bytes();

// each data column
for column in columns.values_mut() {
for (comp_name, column) in columns {
dropped_num_bytes += column.0.swap_remove(row_index).total_size_bytes();
deleted_comps
.entry(*comp_name)
.or_default()
.push(time.into());
}

// NOTE: A single `RowId` cannot possibly have more than one datapoint for
Expand All @@ -525,7 +558,13 @@ impl PersistentIndexedTable {
/// Tries to drop the given `row_id` from the table.
///
/// Returns how many bytes were actually dropped, or zero if the row wasn't found.
fn try_drop_row(&mut self, row_id: RowId) -> u64 {
fn try_drop_row(
&mut self,
row_id: RowId,
deleted_comps: &mut IntMap<ComponentName, u64>,
) -> u64 {
re_tracing::profile_function!();

let mut dropped_num_bytes = 0u64;

let PersistentIndexedTable {
Expand Down Expand Up @@ -562,8 +601,9 @@ impl PersistentIndexedTable {
dropped_num_bytes += col_num_instances.remove(row_index).total_size_bytes();

// each data column
for column in columns.values_mut() {
for (comp_name, column) in columns {
dropped_num_bytes += column.0.remove(row_index).total_size_bytes();
*deleted_comps.entry(*comp_name).or_default() += 1;
}
}

Expand Down
10 changes: 1 addition & 9 deletions crates/re_arrow_store/src/store_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,10 +657,6 @@ impl IndexedTable {
&mut self,
time_range: impl RangeBounds<TimeInt>,
) -> impl Iterator<Item = (TimeInt, &mut IndexedBucket)> {
// Beware! This merely measures the time it takes to gather all the necessary metadata
// for building the returned iterator.
re_tracing::profile_function!();

self.buckets
.range_mut(time_range)
.rev()
Expand Down Expand Up @@ -990,12 +986,8 @@ impl IndexedBucketInner {
re_tracing::profile_scope!("data");
// shuffle component columns back into a sorted state
for column in columns.values_mut() {
let mut source = {
re_tracing::profile_scope!("clone");
column.clone()
};
let mut source = column.clone();
{
re_tracing::profile_scope!("rotate");
for (from, to) in swaps.iter().copied() {
column[to] = source[from].take();
}
Expand Down
10 changes: 5 additions & 5 deletions crates/re_arrow_store/tests/correctness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,10 @@ fn gc_correct() {

let stats = DataStoreStats::from_store(&store);

let (row_ids, stats_diff) = store.gc(GarbageCollectionOptions::gc_everything());
let (deleted, stats_diff) = store.gc(GarbageCollectionOptions::gc_everything());
let stats_diff = stats_diff + stats_empty; // account for fixed overhead

assert_eq!(row_ids.len() as u64, stats.total.num_rows);
assert_eq!(deleted.row_ids.len() as u64, stats.total.num_rows);
assert_eq!(
stats.metadata_registry.num_rows,
stats_diff.metadata_registry.num_rows
Expand All @@ -316,12 +316,12 @@ fn gc_correct() {

sanity_unwrap(&mut store);
check_still_readable(&store);
for row_id in &row_ids {
for row_id in &deleted.row_ids {
assert!(store.get_msg_metadata(row_id).is_none());
}

let (row_ids, stats_diff) = store.gc(GarbageCollectionOptions::gc_everything());
assert!(row_ids.is_empty());
let (deleted, stats_diff) = store.gc(GarbageCollectionOptions::gc_everything());
assert!(deleted.row_ids.is_empty());
assert_eq!(DataStoreStats::default(), stats_diff);

sanity_unwrap(&mut store);
Expand Down
4 changes: 2 additions & 2 deletions crates/re_arrow_store/tests/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,13 +880,13 @@ fn gc_impl(store: &mut DataStore) {

let stats = DataStoreStats::from_store(store);

let (row_ids, stats_diff) = store.gc(GarbageCollectionOptions {
let (deleted, stats_diff) = store.gc(GarbageCollectionOptions {
target: GarbageCollectionTarget::DropAtLeastFraction(1.0 / 3.0),
gc_timeless: false,
protect_latest: 0,
purge_empty_tables: false,
});
for row_id in &row_ids {
for row_id in &deleted.row_ids {
assert!(store.get_msg_metadata(row_id).is_none());
}

Expand Down
14 changes: 5 additions & 9 deletions crates/re_crash_handler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,11 @@ fn install_panic_hook(_build_info: BuildInfo) {
.name()
.map_or_else(|| format!("{:?}", thread.id()), |name| name.to_owned());

let file_line_suffix = if let Some(file_line) = &file_line {
format!(", {file_line}")
} else {
String::new()
};

eprintln!(
"\nthread '{thread_name}' panicked at '{msg}'{file_line_suffix}\n\n{callstack}"
);
eprintln!("\nthread '{thread_name}' panicked at '{msg}'");
if let Some(file_line) = &file_line {
eprintln!("{file_line}");
}
eprintln!("stack backtrace:\n{callstack}");
} else {
// This prints the panic message and callstack:
(*previous_panic_hook)(panic_info);
Expand Down
Loading

0 comments on commit a91f415

Please sign in to comment.