Skip to content

Commit

Permalink
Static data 1: static-aware datastore, caches and queries (#5535)
Browse files Browse the repository at this point in the history
Introduces the concept of static data into the data APIs.

Static data is a on a per-entity per-component basis. If it exists, it
unconditionally shadows any temporal data of the same type. It is never
garbage collected.
When static data is returned, it is indicated via `TimeInt::STATIC`.

The terminology has been normalized all over the place: data is either
static or temporal, and nothing else.

Static data cannot have more than one cell per-entity per-component.
Trying to write more than one cells will trigger last-write-wins
semantics, as defined by `RowId` ordering.

Timeless fallbacks just don't exist anymore, which simplifies out _a
lot_ of code in the datastore and query cache.

Note: static data is in many subtle ways incompatible with our legacy
InstanceKey-based model, which results in a couple hacks in this PR.
Those hacks will be gone as soon as the new data APIs land and instance
keys go away.

- Fixes #5264
- Fixes #2074
- Fixes #5447
- Fixes #1766


---

Part of a PR series that removes the concept of timeless data in favor
of the much simpler concept of static data:
- #5534
- #5535
- #5536
- #5537
- #5540
  • Loading branch information
teh-cmc authored Apr 5, 2024
1 parent ee51fe8 commit 08109da
Show file tree
Hide file tree
Showing 64 changed files with 1,173 additions and 2,268 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/re_data_source/src/data_loader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use re_log_types::{ArrowMsg, DataRow, EntityPath, LogMsg, TimePoint};
/// * `--recording-id <store_id>`
/// * `--opened-recording-id <opened_store_id>` (if set)
/// * `--entity-path-prefix <entity_path_prefix>` (if set)
/// * `--timeless` (if `timepoint` is set to the timeless timepoint)
/// * `--static` (if `timepoint` is set to the timeless timepoint)
/// * `--timeless` \[deprecated\] (if `timepoint` is set to the timeless timepoint)
/// * `--time <timeline1>=<time1> <timeline2>=<time2> ...` (if `timepoint` contains temporal data)
/// * `--sequence <timeline1>=<seq1> <timeline2>=<seq2> ...` (if `timepoint` contains sequence data)
#[derive(Debug, Clone)]
Expand Down
13 changes: 5 additions & 8 deletions crates/re_data_store/benches/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,6 @@ fn range(c: &mut Criterion) {
b.iter(|| {
let rows = range_data(&store, [LargeStruct::name()]);
for (cur_time, (time, cells)) in rows.enumerate() {
let time = time.unwrap();
assert_eq!(cur_time as i64, time.as_i64());

let large_structs = cells[0]
Expand Down Expand Up @@ -339,7 +338,6 @@ fn gc(c: &mut Criterion) {
let mut store = store.clone();
let (_, stats_diff) = store.gc(&GarbageCollectionOptions {
target: GarbageCollectionTarget::DropAtLeastFraction(1.0 / 3.0),
gc_timeless: false,
protect_latest: 0,
purge_empty_tables: false,
dont_protect: Default::default(),
Expand All @@ -364,7 +362,6 @@ fn gc(c: &mut Criterion) {
let mut store = store.clone();
let (_, stats_diff) = store.gc(&GarbageCollectionOptions {
target: GarbageCollectionTarget::DropAtLeastFraction(1.0 / 3.0),
gc_timeless: false,
protect_latest: 0,
purge_empty_tables: false,
dont_protect: Default::default(),
Expand Down Expand Up @@ -454,22 +451,22 @@ fn latest_data_at<const N: usize>(
) -> [Option<DataCell>; N] {
let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let timeline_query = LatestAtQuery::new(timeline_frame_nr, NUM_ROWS / 2);
let ent_path = EntityPath::from("large_structs");
let entity_path = EntityPath::from("large_structs");

store
.latest_at(&timeline_query, &ent_path, primary, secondaries)
.latest_at(&timeline_query, &entity_path, primary, secondaries)
.map_or_else(|| [(); N].map(|_| None), |(_, _, cells)| cells)
}

fn range_data<const N: usize>(
store: &DataStore,
components: [ComponentName; N],
) -> impl Iterator<Item = (Option<TimeInt>, [Option<DataCell>; N])> + '_ {
) -> impl Iterator<Item = (TimeInt, [Option<DataCell>; N])> + '_ {
let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let query = RangeQuery::new(timeline_frame_nr, TimeRange::new(TimeInt::ZERO, NUM_ROWS));
let ent_path = EntityPath::from("large_structs");
let entity_path = EntityPath::from("large_structs");

store
.range(&query, &ent_path, components)
.range(&query, &entity_path, components)
.map(move |(time, _, cells)| (time, cells))
}
84 changes: 1 addition & 83 deletions crates/re_data_store/benches/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use re_log_types::{
use re_types::components::InstanceKey;
use re_types_core::{AsComponents, ComponentBatch, ComponentName, Loggable as _};

criterion_group!(benches, plotting_dashboard, timeless_logs);
criterion_group!(benches, plotting_dashboard);
criterion_main!(benches);

// ---
Expand Down Expand Up @@ -64,7 +64,6 @@ fn plotting_dashboard(c: &mut Criterion) {

let gc_settings = GarbageCollectionOptions {
target: GarbageCollectionTarget::DropAtLeastFraction(DROP_AT_LEAST),
gc_timeless: true,
protect_latest: 1,
purge_empty_tables: false,
dont_protect: Default::default(),
Expand Down Expand Up @@ -139,87 +138,6 @@ fn plotting_dashboard(c: &mut Criterion) {
}
}

fn timeless_logs(c: &mut Criterion) {
const DROP_AT_LEAST: f64 = 0.3;

let mut group = c.benchmark_group(format!(
"datastore/num_entities={NUM_ENTITY_PATHS}/num_rows_per_entity={NUM_ROWS_PER_ENTITY_PATH}/timeless_logs/drop_at_least={DROP_AT_LEAST}"
));
group.throughput(criterion::Throughput::Elements(
((NUM_ENTITY_PATHS * NUM_ROWS_PER_ENTITY_PATH) as f64 * DROP_AT_LEAST) as _,
));
group.sample_size(10);

let gc_settings = GarbageCollectionOptions {
target: GarbageCollectionTarget::DropAtLeastFraction(DROP_AT_LEAST),
gc_timeless: true,
protect_latest: 1,
purge_empty_tables: false,
dont_protect: Default::default(),
enable_batching: false,
time_budget: std::time::Duration::MAX,
};

let mut timegen = |_| TimePoint::default();

let mut datagen = |i: usize| {
Box::new(re_types::archetypes::TextLog::new(i.to_string())) as Box<dyn AsComponents>
};

// Default config
group.bench_function("default", |b| {
let store = build_store(
Default::default(),
InstanceKey::name(),
false,
&mut timegen,
&mut datagen,
);
b.iter_batched(
|| store.clone(),
|mut store| {
let (_, stats_diff) = store.gc(&gc_settings);
stats_diff
},
BatchSize::LargeInput,
);
});

for &num_rows_per_bucket in num_rows_per_bucket() {
for &gc_batching in gc_batching() {
group.bench_function(
if gc_batching {
format!("bucketsz={num_rows_per_bucket}/gc_batching=true")
} else {
format!("bucketsz={num_rows_per_bucket}")
},
|b| {
let store = build_store(
DataStoreConfig {
indexed_bucket_num_rows: num_rows_per_bucket,
..Default::default()
},
InstanceKey::name(),
false,
&mut timegen,
&mut datagen,
);
let mut gc_settings = gc_settings.clone();
gc_settings.enable_batching = gc_batching;
b.iter_batched(
|| store.clone(),
|mut store| {
let (_, stats_diff) = store.gc(&gc_settings);
stats_diff
},
BatchSize::LargeInput,
);
},
);
}
}
}

// --- Helpers ---

fn build_store<FT, FD>(
Expand Down
5 changes: 1 addition & 4 deletions crates/re_data_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,9 @@ pub use self::store_write::{WriteError, WriteResult};

pub(crate) use self::store::{
ClusterCellCache, IndexedBucket, IndexedBucketInner, IndexedTable, MetadataRegistry,
PersistentIndexedTable,
StaticCell, StaticTable,
};

#[allow(unused_imports)] // only used with some sets of feature flags atm
pub(crate) use self::store::PersistentIndexedTableInner;

// Re-exports
#[doc(no_inline)]
pub use arrow2::io::ipc::read::{StreamReader, StreamState};
Expand Down
Loading

0 comments on commit 08109da

Please sign in to comment.