Skip to content

Commit

Permalink
Automatic removal of unreachable static chunks (#7518)
Browse files Browse the repository at this point in the history
Unreachable static chunks are dead weight: there exists no query that
can access their data (at least when using Rerun as a Visualizer).

By automatically removing dangling chunks, we make it possible for user
to use the Rerun Viewer as a soft-realtime telemetry system (provided we
properly invalidate our caches too, which is the subject of an upcoming
PR).

This raises the question of what should happen when using Rerun as a
database: should this data be kept and made accessible?
If so, this behavior should probably be made configurable (e.g. when
instantiating a ChunkStore in the SDK).

* Part of #7404 

---

Test:
```python
from pathlib import Path

import rerun as rr

image_file_path = Path(__file__).parent / "ferris.png"

rr.init("rerun_example_encoded_image", spawn=True)

for _ in range(0, 10):
    rr.log("image", rr.EncodedImage(path=image_file_path), static=True)
```

Command:
```
RERUN_FLUSH_NUM_ROWS=0 python test.py
```

Before:

![image](https://github.com/user-attachments/assets/a9823055-5247-4d63-9295-fc310aa4923f)

After:

![image](https://github.com/user-attachments/assets/c095fda2-02ac-4456-bf3f-e251b24dc75d)
  • Loading branch information
teh-cmc authored Sep 26, 2024
1 parent bd87d4c commit ca44717
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 15 deletions.
188 changes: 173 additions & 15 deletions crates/store/re_chunk_store/src/writes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ impl ChunkStore {

let row_id_range_per_component = chunk.row_id_range_per_component();

let mut overwritten_chunk_ids = HashMap::default();

for (&component_name, list_array) in chunk.components() {
let is_empty = list_array
.validity()
Expand All @@ -79,16 +81,25 @@ impl ChunkStore {
// NOTE: When attempting to overwrite static data, the chunk with the most
// recent data within -- according to RowId -- wins.

let cur_row_id_max = self.chunks_per_chunk_id.get(cur_chunk_id).map_or(
RowId::ZERO,
|chunk| {
let (cur_row_id_min, cur_row_id_max) = self
.chunks_per_chunk_id
.get(cur_chunk_id)
.map_or((RowId::ZERO, RowId::ZERO), |chunk| {
chunk
.row_id_range_per_component()
.get(&component_name)
.map_or(RowId::ZERO, |(_, row_id_max)| *row_id_max)
},
);
.map_or(
(RowId::ZERO, RowId::ZERO),
|(row_id_min, row_id_max)| (*row_id_min, *row_id_max),
)
});
if *row_id_max > cur_row_id_max {
// We are about to overwrite the existing chunk with the new one, at
// least for this one specific component.
// Keep track of the overwritten ChunkId: we'll need it further down in
// order to check whether that chunk is now dangling.
overwritten_chunk_ids.insert(*cur_chunk_id, cur_row_id_min);

*cur_chunk_id = chunk.id();
}
})
Expand All @@ -97,13 +108,55 @@ impl ChunkStore {

self.static_chunks_stats += ChunkStoreChunkStats::from_chunk(chunk);

(
Arc::clone(chunk),
vec![ChunkStoreDiff::addition(
non_compacted_chunk, /* added */
None, /* compacted */
)],
)
let mut diffs = vec![ChunkStoreDiff::addition(
non_compacted_chunk, /* added */
None, /* compacted */
)];

// NOTE(1): Our chunks can only cover a single entity path at a time, therefore we know we
// only have to check that one entity for complete overwrite.
// NOTE(2): This condition cannot fail, we just want to avoid unwrapping.
if let Some(per_component) = self.static_chunk_ids_per_entity.get(chunk.entity_path()) {
re_tracing::profile_scope!("static dangling checks");

// At this point, we are in possession of a list of ChunkIds that were at least
// _partially_ overwritten (i.e. some, but not necessarily all, of the components
// that they used to provide the data for are now provided by another, newer chunk).
//
// To determine whether any of these chunks are actually fully overwritten, and
// therefore dangling, we need to make sure there are no components left
// referencing these ChunkIds whatsoever.
//
// Because our storage model guarantees that a single chunk cannot cover more than
// one entity, this is actually pretty cheap to do, since we only have to loop over
// all the components of a single entity.

for (chunk_id, chunk_row_id_min) in overwritten_chunk_ids {
let has_been_fully_overwritten = !per_component
.values()
.any(|cur_chunk_id| *cur_chunk_id == chunk_id);

if has_been_fully_overwritten {
// The chunk is now dangling: remove it from all relevant indices, update
// the stats, and fire deletion events.

let chunk_id_removed =
self.chunk_ids_per_min_row_id.remove(&chunk_row_id_min);
debug_assert!(chunk_id_removed.is_some());

let chunk_removed = self.chunks_per_chunk_id.remove(&chunk_id);
debug_assert!(chunk_removed.is_some());

if let Some(chunk_removed) = chunk_removed {
self.static_chunks_stats -=
ChunkStoreChunkStats::from_chunk(&chunk_removed);
diffs.push(ChunkStoreDiff::deletion(chunk_removed));
}
}
}
}

(Arc::clone(chunk), diffs)
} else {
// Temporal data: just index the chunk on every dimension of interest.
re_tracing::profile_scope!("temporal");
Expand Down Expand Up @@ -525,8 +578,11 @@ impl ChunkStore {

#[cfg(test)]
mod tests {
use re_chunk::Timeline;
use re_log_types::example_components::MyPoint;
use re_chunk::{TimePoint, Timeline};
use re_log_types::example_components::{MyColor, MyLabel, MyPoint};
use similar_asserts::assert_eq;

use crate::ChunkStoreDiffKind;

use super::*;

Expand Down Expand Up @@ -644,4 +700,106 @@ mod tests {

Ok(())
}

#[test]
fn static_overwrites() -> anyhow::Result<()> {
re_log::setup_logging();

let mut store = ChunkStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
Default::default(),
);

let entity_path = EntityPath::from("this/that");

let row_id1_1 = RowId::new();
let row_id2_1 = RowId::new();
let row_id2_2 = RowId::new();

let timepoint_static = TimePoint::default();

let points1 = &[MyPoint::new(1.0, 1.0)];
let colors1 = &[MyColor::from_rgb(1, 1, 1)];
let labels1 = &[MyLabel("111".to_owned())];

let points2 = &[MyPoint::new(2.0, 2.0)];
let colors2 = &[MyColor::from_rgb(2, 2, 2)];
let labels2 = &[MyLabel("222".to_owned())];

let chunk1 = Chunk::builder(entity_path.clone())
.with_component_batches(
row_id1_1,
timepoint_static.clone(),
[points1 as _, colors1 as _, labels1 as _],
)
.build()?;
let chunk2 = Chunk::builder(entity_path.clone())
.with_component_batches(
row_id2_1,
timepoint_static.clone(),
[points2 as _, colors2 as _],
)
.build()?;
let chunk3 = Chunk::builder(entity_path.clone())
.with_component_batches(row_id2_2, timepoint_static, [labels2 as _])
.build()?;

let chunk1 = Arc::new(chunk1);
let chunk2 = Arc::new(chunk2);
let chunk3 = Arc::new(chunk3);

let events = store.insert_chunk(&chunk1)?;
assert!(
events.len() == 1
&& events[0].chunk.id() == chunk1.id()
&& events[0].kind == ChunkStoreDiffKind::Addition,
"the first write should result in the addition of chunk1 and nothing else"
);

let events = store.insert_chunk(&chunk2)?;
assert!(
events.len() == 1
&& events[0].chunk.id() == chunk2.id()
&& events[0].kind == ChunkStoreDiffKind::Addition,
"the second write should result in the addition of chunk2 and nothing else"
);

let stats_before = store.stats();
{
let ChunkStoreChunkStats {
num_chunks,
total_size_bytes: _,
num_rows,
num_events,
} = stats_before.static_chunks;
assert_eq!(2, num_chunks);
assert_eq!(2, num_rows);
assert_eq!(5, num_events);
}

let events = store.insert_chunk(&chunk3)?;
assert!(
events.len() == 2
&& events[0].chunk.id() == chunk3.id()
&& events[0].kind == ChunkStoreDiffKind::Addition
&& events[1].chunk.id() == chunk1.id()
&& events[1].kind == ChunkStoreDiffKind::Deletion,
"the final write should result in the addition of chunk3 _and_ the deletion of the now fully overwritten chunk1"
);

let stats_after = store.stats();
{
let ChunkStoreChunkStats {
num_chunks,
total_size_bytes: _,
num_rows,
num_events,
} = stats_after.static_chunks;
assert_eq!(2, num_chunks);
assert_eq!(2, num_rows);
assert_eq!(3, num_events);
}

Ok(())
}
}
5 changes: 5 additions & 0 deletions crates/store/re_entity_db/src/entity_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,11 @@ impl EntityDb {
self.query_caches.on_events(&store_events);
self.tree.on_store_additions(&store_events);

// It is possible for writes to trigger deletions: specifically in the case of
// overwritten static data leading to dangling chunks.
self.tree
.on_store_deletions(&self.data_store, &store_events);

// We inform the stats last, since it measures e2e latency.
self.stats.on_events(&store_events);
}
Expand Down

0 comments on commit ca44717

Please sign in to comment.