Skip to content

Commit

Permalink
Datastore revamp 5:
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Apr 12, 2023
1 parent 6bd5323 commit 150ce29
Show file tree
Hide file tree
Showing 12 changed files with 514 additions and 36 deletions.
1 change: 1 addition & 0 deletions crates/re_arrow_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
mod arrow_util;
mod store;
mod store_arrow;
mod store_dump;
mod store_format;
mod store_gc;
mod store_read;
Expand Down
7 changes: 6 additions & 1 deletion crates/re_arrow_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use re_log_types::{

// --- Data store ---

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DataStoreConfig {
/// The maximum number of rows in an indexed bucket before triggering a split.
/// Does not apply to timeless data.
Expand Down Expand Up @@ -201,6 +201,11 @@ impl DataStore {
self.cluster_key
}

/// See [`DataStoreConfig`] for more information about configuration.
pub fn config(&self) -> &DataStoreConfig {
&self.config
}

/// Lookup the arrow [`DataType`] of a [`re_log_types::Component`] in the internal
/// `DataTypeRegistry`.
pub fn lookup_datatype(&self, component: &ComponentName) -> Option<&DataType> {
Expand Down
195 changes: 195 additions & 0 deletions crates/re_arrow_store/src/store_dump.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
use ahash::HashMapExt;
use arrow2::Either;
use nohash_hasher::IntMap;
use re_log_types::{
DataCellColumn, DataTable, ErasedTimeVec, RowIdVec, TableId, TimeRange, Timeline,
};

use crate::{
store::{IndexedBucketInner, PersistentIndexedTable},
DataStore, IndexedBucket,
};

// ---

impl DataStore {
/// Serializes the entire datastore into an iterator of [`DataTable`]s.
// TODO(#1793): This shouldn't dump cluster keys that were autogenerated.
// TODO(#1794): Implement simple recompaction.
pub fn to_data_tables(
&self,
time_filter: Option<(Timeline, TimeRange)>,
) -> impl Iterator<Item = DataTable> + '_ {
let timeless = self.dump_timeless_tables();
let temporal = if let Some(time_filter) = time_filter {
Either::Left(self.dump_temporal_tables_filtered(time_filter))
} else {
Either::Right(self.dump_temporal_tables())
};

timeless.chain(temporal)
}

fn dump_timeless_tables(&self) -> impl Iterator<Item = DataTable> + '_ {
self.timeless_tables.values().map(|table| {
crate::profile_scope!("timeless_table");

let PersistentIndexedTable {
ent_path,
cluster_key: _,
col_insert_id: _,
col_row_id,
col_num_instances,
columns,
} = table;

DataTable {
table_id: TableId::random(),
col_row_id: col_row_id.clone(),
col_timelines: Default::default(),
col_entity_path: std::iter::repeat_with(|| ent_path.clone())
.take(table.total_rows() as _)
.collect(),
col_num_instances: col_num_instances.clone(),
columns: columns.clone(), // shallow
}
})
}

fn dump_temporal_tables(&self) -> impl Iterator<Item = DataTable> + '_ {
self.tables.values().flat_map(|table| {
crate::profile_scope!("temporal_table");

table.buckets.values().map(move |bucket| {
crate::profile_scope!("temporal_bucket");

bucket.sort_indices_if_needed();

let IndexedBucket {
timeline,
cluster_key: _,
inner,
} = bucket;

let IndexedBucketInner {
is_sorted,
time_range: _,
col_time,
col_insert_id: _,
col_row_id,
col_num_instances,
columns,
size_bytes: _,
} = &*inner.read();
debug_assert!(is_sorted);

DataTable {
table_id: TableId::random(),
col_row_id: col_row_id.clone(),
col_timelines: [(*timeline, col_time.iter().copied().map(Some).collect())]
.into(),
col_entity_path: std::iter::repeat_with(|| table.ent_path.clone())
.take(table.total_rows() as _)
.collect(),
col_num_instances: col_num_instances.clone(),
columns: columns.clone(), // shallow
}
})
})
}

fn dump_temporal_tables_filtered(
&self,
(timeline_filter, time_filter): (Timeline, TimeRange),
) -> impl Iterator<Item = DataTable> + '_ {
self.tables
.values()
.filter_map(move |table| {
crate::profile_scope!("temporal_table_filtered");

if table.timeline != timeline_filter {
return None;
}

Some(table.buckets.values().filter_map(move |bucket| {
crate::profile_scope!("temporal_bucket_filtered");

bucket.sort_indices_if_needed();

let IndexedBucket {
timeline,
cluster_key: _,
inner,
} = bucket;

let IndexedBucketInner {
is_sorted,
time_range,
col_time,
col_insert_id: _,
col_row_id,
col_num_instances,
columns,
size_bytes: _,
} = &*inner.read();
debug_assert!(is_sorted);

if !time_range.intersects(time_filter) {
return None;
}

let col_row_id: RowIdVec =
filter_column(col_time, col_row_id.iter(), time_filter).collect();

// NOTE: Shouldn't ever happen due to check above, but better safe than
// sorry...
debug_assert!(!col_row_id.is_empty());
if col_row_id.is_empty() {
return None;
}

let col_timelines = [(
*timeline,
filter_column(col_time, col_time.iter(), time_filter)
.map(Some)
.collect(),
)]
.into();

let col_entity_path = std::iter::repeat_with(|| table.ent_path.clone())
.take(col_row_id.len())
.collect();

let col_num_instances =
filter_column(col_time, col_num_instances.iter(), time_filter).collect();

let mut columns2 = IntMap::with_capacity(columns.len());
for (component, column) in columns {
let column = filter_column(col_time, column.iter(), time_filter).collect();
columns2.insert(*component, DataCellColumn(column));
}

Some(DataTable {
table_id: TableId::random(),
col_row_id,
col_timelines,
col_entity_path,
col_num_instances,
columns: columns2,
})
}))
})
.flatten()
}
}

fn filter_column<'a, T: 'a + Clone>(
col_time: &'a ErasedTimeVec,
column: impl Iterator<Item = &'a T> + 'a,
time_filter: TimeRange,
) -> impl Iterator<Item = T> + 'a {
col_time
.iter()
.zip(column)
.filter_map(move |(time, v)| time_filter.contains((*time).into()).then(|| v.clone()))
}
9 changes: 2 additions & 7 deletions crates/re_arrow_store/src/store_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ use nohash_hasher::IntMap;
use re_log_types::{ComponentName, DataCellColumn};

use crate::{
store::IndexedBucketInner, DataStore, DataStoreConfig, IndexedBucket, IndexedTable,
PersistentIndexedTable,
store::IndexedBucketInner, DataStore, IndexedBucket, IndexedTable, PersistentIndexedTable,
};

// ---

#[derive(Default, Debug, Clone)]
#[derive(Default, Debug, Clone, PartialEq, Eq, PartialOrd)]
pub struct DataStoreStats {
pub total_timeless_rows: u64,
pub total_timeless_size_bytes: u64,
Expand All @@ -19,8 +18,6 @@ pub struct DataStoreStats {

pub total_rows: u64,
pub total_size_bytes: u64,

pub config: DataStoreConfig,
}

impl DataStoreStats {
Expand All @@ -47,8 +44,6 @@ impl DataStoreStats {

total_rows,
total_size_bytes,

config: store.config.clone(),
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/re_arrow_store/src/store_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ impl DataStore {
let values =
arrow2::array::UInt64Array::from_vec((0..num_instances as u64).collect_vec())
.boxed();
let cell = DataCell::from_arrow(InstanceKey::name(), values);
let mut cell = DataCell::from_arrow(InstanceKey::name(), values);
cell.compute_size_bytes();

self.cluster_cell_cache
.insert(num_instances, cell.clone() /* shallow */);
Expand Down
55 changes: 41 additions & 14 deletions crates/re_arrow_store/tests/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use re_log_types::{
build_frame_nr, build_some_colors, build_some_instances, build_some_instances_from,
build_some_point2d, build_some_rects,
},
Component as _, ComponentName, DataCell, DataRow, EntityPath, TimeType, Timeline,
Component as _, ComponentName, DataCell, DataRow, DataTable, EntityPath, TableId, TimeType,
Timeline,
};

// TODO(#1619): introduce batching in the testing matrix
Expand All @@ -41,6 +42,13 @@ fn all_components() {

let assert_latest_components_at =
|store: &mut DataStore, ent_path: &EntityPath, expected: Option<&[ComponentName]>| {
// Stress test save-to-disk & load-from-disk
let mut store2 = DataStore::new(store.cluster_key(), store.config().clone());
for table in store.to_data_tables(None) {
store2.insert_table(&table).unwrap();
}

let mut store = store2;
let timeline = Timeline::new("frame_nr", TimeType::Sequence);

let components = store.all_components(&timeline, ent_path);
Expand Down Expand Up @@ -251,6 +259,7 @@ fn latest_at() {
for config in re_arrow_store::test_util::all_configs() {
let mut store = DataStore::new(InstanceKey::name(), config.clone());
latest_at_impl(&mut store);

// TODO(#1619): bring back garbage collection
// store.gc(
// GarbageCollectionTarget::DropAtLeastPercentage(1.0),
Expand All @@ -272,32 +281,43 @@ fn latest_at_impl(store: &mut DataStore) {
let frame3: TimeInt = 3.into();
let frame4: TimeInt = 4.into();

// helper to insert a row both as a temporal and timeless payload
let insert = |store: &mut DataStore, row| {
// helper to insert a table both as a temporal and timeless payload
let insert_table = |store: &mut DataStore, table: &DataTable| {
// insert temporal
store.insert_row(row).unwrap();
store.insert_table(table).unwrap();

// insert timeless
let mut row_timeless = (*row).clone();
row_timeless.timepoint = Default::default();
store.insert_row(&row_timeless).unwrap();
let mut table_timeless = table.clone();
table_timeless.col_timelines = Default::default();
store.insert_table(&table_timeless).unwrap();
};

let (instances1, colors1) = (build_some_instances(3), build_some_colors(3));
let row1 = test_row!(ent_path @ [build_frame_nr(frame1)] => 3; [instances1.clone(), colors1]);
insert(store, &row1);

let points2 = build_some_point2d(3);
let row2 = test_row!(ent_path @ [build_frame_nr(frame2)] => 3; [instances1, points2]);
insert(store, &row2);

let points3 = build_some_point2d(10);
let row3 = test_row!(ent_path @ [build_frame_nr(frame3)] => 10; [points3]);
insert(store, &row3);

let colors4 = build_some_colors(5);
let row4 = test_row!(ent_path @ [build_frame_nr(frame4)] => 5; [colors4]);
insert(store, &row4);

insert_table(
store,
&DataTable::from_rows(
TableId::random(),
[row1.clone(), row2.clone(), row3.clone(), row4.clone()],
),
);

// Stress test save-to-disk & load-from-disk
let mut store2 = DataStore::new(store.cluster_key(), store.config().clone());
for table in store.to_data_tables(None) {
store2.insert_table(&table).unwrap();
}
let mut store = store2;

if let err @ Err(_) = store.sanity_check() {
store.sort_indices_if_needed();
Expand All @@ -310,7 +330,7 @@ fn latest_at_impl(store: &mut DataStore) {
let components_all = &[ColorRGBA::name(), Point2D::name()];

let df = polars_util::latest_components(
store,
&store,
&LatestAtQuery::new(timeline_frame_nr, frame_nr),
&ent_path,
components_all,
Expand Down Expand Up @@ -433,10 +453,17 @@ fn range_impl(store: &mut DataStore) {
// A single timepoint might have several of those! That's one of the behaviors specific to
// range queries.
#[allow(clippy::type_complexity)]
let mut assert_range_components =
let assert_range_components =
|time_range: TimeRange,
components: [ComponentName; 2],
rows_at_times: &[(Option<TimeInt>, &[(ComponentName, &DataRow)])]| {
// Stress test save-to-disk & load-from-disk
let mut store2 = DataStore::new(store.cluster_key(), store.config().clone());
for table in store.to_data_tables(None) {
store2.insert_table(&table).unwrap();
}
let mut store = store2;

let mut expected_timeless = Vec::<DataFrame>::new();
let mut expected_at_times: IntMap<TimeInt, Vec<DataFrame>> = Default::default();

Expand All @@ -456,7 +483,7 @@ fn range_impl(store: &mut DataStore) {
let components = [InstanceKey::name(), components[0], components[1]];
let query = RangeQuery::new(timeline_frame_nr, time_range);
let dfs = polars_util::range_components(
store,
&store,
&query,
&ent_path,
components[1],
Expand Down
Loading

0 comments on commit 150ce29

Please sign in to comment.