Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datastore revamp 5: DataStore::to_data_tables() #1791

Merged
merged 1 commit into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/re_arrow_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
mod arrow_util;
mod store;
mod store_arrow;
mod store_dump;
mod store_format;
mod store_gc;
mod store_read;
Expand Down
7 changes: 6 additions & 1 deletion crates/re_arrow_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use re_log_types::{

// --- Data store ---

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DataStoreConfig {
/// The maximum number of rows in an indexed bucket before triggering a split.
/// Does not apply to timeless data.
Expand Down Expand Up @@ -201,6 +201,11 @@ impl DataStore {
self.cluster_key
}

/// See [`DataStoreConfig`] for more information about configuration.
pub fn config(&self) -> &DataStoreConfig {
&self.config
}

/// Lookup the arrow [`DataType`] of a [`re_log_types::Component`] in the internal
/// `DataTypeRegistry`.
pub fn lookup_datatype(&self, component: &ComponentName) -> Option<&DataType> {
Expand Down
195 changes: 195 additions & 0 deletions crates/re_arrow_store/src/store_dump.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
use ahash::HashMapExt;
use arrow2::Either;
use nohash_hasher::IntMap;
use re_log_types::{
DataCellColumn, DataTable, ErasedTimeVec, RowIdVec, TableId, TimeRange, Timeline,
};

use crate::{
store::{IndexedBucketInner, PersistentIndexedTable},
DataStore, IndexedBucket,
};

// ---

impl DataStore {
/// Serializes the entire datastore into an iterator of [`DataTable`]s.
// TODO(#1793): This shouldn't dump cluster keys that were autogenerated.
// TODO(#1794): Implement simple recompaction.
pub fn to_data_tables(
&self,
time_filter: Option<(Timeline, TimeRange)>,
) -> impl Iterator<Item = DataTable> + '_ {
let timeless = self.dump_timeless_tables();
let temporal = if let Some(time_filter) = time_filter {
Either::Left(self.dump_temporal_tables_filtered(time_filter))
} else {
Either::Right(self.dump_temporal_tables())
};

timeless.chain(temporal)
}

fn dump_timeless_tables(&self) -> impl Iterator<Item = DataTable> + '_ {
self.timeless_tables.values().map(|table| {
crate::profile_scope!("timeless_table");

let PersistentIndexedTable {
ent_path,
cluster_key: _,
col_insert_id: _,
col_row_id,
col_num_instances,
columns,
} = table;

DataTable {
table_id: TableId::random(),
col_row_id: col_row_id.clone(),
col_timelines: Default::default(),
col_entity_path: std::iter::repeat_with(|| ent_path.clone())
.take(table.total_rows() as _)
.collect(),
col_num_instances: col_num_instances.clone(),
columns: columns.clone(), // shallow
}
})
}

fn dump_temporal_tables(&self) -> impl Iterator<Item = DataTable> + '_ {
self.tables.values().flat_map(|table| {
crate::profile_scope!("temporal_table");

table.buckets.values().map(move |bucket| {
crate::profile_scope!("temporal_bucket");

bucket.sort_indices_if_needed();

let IndexedBucket {
timeline,
cluster_key: _,
inner,
} = bucket;

let IndexedBucketInner {
is_sorted,
time_range: _,
col_time,
col_insert_id: _,
col_row_id,
col_num_instances,
columns,
size_bytes: _,
} = &*inner.read();
debug_assert!(is_sorted);

DataTable {
table_id: TableId::random(),
col_row_id: col_row_id.clone(),
col_timelines: [(*timeline, col_time.iter().copied().map(Some).collect())]
.into(),
col_entity_path: std::iter::repeat_with(|| table.ent_path.clone())
.take(table.total_rows() as _)
.collect(),
col_num_instances: col_num_instances.clone(),
columns: columns.clone(), // shallow
}
})
})
}

fn dump_temporal_tables_filtered(
&self,
(timeline_filter, time_filter): (Timeline, TimeRange),
) -> impl Iterator<Item = DataTable> + '_ {
self.tables
.values()
.filter_map(move |table| {
crate::profile_scope!("temporal_table_filtered");

if table.timeline != timeline_filter {
return None;
}

Some(table.buckets.values().filter_map(move |bucket| {
crate::profile_scope!("temporal_bucket_filtered");

bucket.sort_indices_if_needed();

let IndexedBucket {
timeline,
cluster_key: _,
inner,
} = bucket;

let IndexedBucketInner {
is_sorted,
time_range,
col_time,
col_insert_id: _,
col_row_id,
col_num_instances,
columns,
size_bytes: _,
} = &*inner.read();
debug_assert!(is_sorted);

if !time_range.intersects(time_filter) {
return None;
}

let col_row_id: RowIdVec =
filter_column(col_time, col_row_id.iter(), time_filter).collect();

// NOTE: Shouldn't ever happen due to check above, but better safe than
// sorry...
debug_assert!(!col_row_id.is_empty());
if col_row_id.is_empty() {
return None;
}

let col_timelines = [(
*timeline,
filter_column(col_time, col_time.iter(), time_filter)
.map(Some)
.collect(),
)]
.into();

let col_entity_path = std::iter::repeat_with(|| table.ent_path.clone())
.take(col_row_id.len())
.collect();

let col_num_instances =
filter_column(col_time, col_num_instances.iter(), time_filter).collect();

let mut columns2 = IntMap::with_capacity(columns.len());
for (component, column) in columns {
let column = filter_column(col_time, column.iter(), time_filter).collect();
columns2.insert(*component, DataCellColumn(column));
}

Some(DataTable {
table_id: TableId::random(),
col_row_id,
col_timelines,
col_entity_path,
col_num_instances,
columns: columns2,
})
}))
})
.flatten()
}
}

fn filter_column<'a, T: 'a + Clone>(
col_time: &'a ErasedTimeVec,
column: impl Iterator<Item = &'a T> + 'a,
time_filter: TimeRange,
) -> impl Iterator<Item = T> + 'a {
col_time
.iter()
.zip(column)
.filter_map(move |(time, v)| time_filter.contains((*time).into()).then(|| v.clone()))
}
9 changes: 2 additions & 7 deletions crates/re_arrow_store/src/store_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ use nohash_hasher::IntMap;
use re_log_types::{ComponentName, DataCellColumn};

use crate::{
store::IndexedBucketInner, DataStore, DataStoreConfig, IndexedBucket, IndexedTable,
PersistentIndexedTable,
store::IndexedBucketInner, DataStore, IndexedBucket, IndexedTable, PersistentIndexedTable,
};

// ---

#[derive(Default, Debug, Clone)]
#[derive(Default, Debug, Clone, PartialEq, Eq, PartialOrd)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make sense to order the stats?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used in tests to check some basic invariants

pub struct DataStoreStats {
pub total_timeless_rows: u64,
pub total_timeless_size_bytes: u64,
Expand All @@ -19,8 +18,6 @@ pub struct DataStoreStats {

pub total_rows: u64,
pub total_size_bytes: u64,

pub config: DataStoreConfig,
}

impl DataStoreStats {
Expand All @@ -47,8 +44,6 @@ impl DataStoreStats {

total_rows,
total_size_bytes,

config: store.config.clone(),
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/re_arrow_store/src/store_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ impl DataStore {
let values =
arrow2::array::UInt64Array::from_vec((0..num_instances as u64).collect_vec())
.boxed();
let cell = DataCell::from_arrow(InstanceKey::name(), values);
let mut cell = DataCell::from_arrow(InstanceKey::name(), values);
cell.compute_size_bytes();

self.cluster_cell_cache
.insert(num_instances, cell.clone() /* shallow */);
Expand Down
55 changes: 41 additions & 14 deletions crates/re_arrow_store/tests/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use re_log_types::{
build_frame_nr, build_some_colors, build_some_instances, build_some_instances_from,
build_some_point2d, build_some_rects,
},
Component as _, ComponentName, DataCell, DataRow, EntityPath, TimeType, Timeline,
Component as _, ComponentName, DataCell, DataRow, DataTable, EntityPath, TableId, TimeType,
Timeline,
};

// TODO(#1619): introduce batching in the testing matrix
Expand All @@ -41,6 +42,13 @@ fn all_components() {

let assert_latest_components_at =
|store: &mut DataStore, ent_path: &EntityPath, expected: Option<&[ComponentName]>| {
// Stress test save-to-disk & load-from-disk
let mut store2 = DataStore::new(store.cluster_key(), store.config().clone());
for table in store.to_data_tables(None) {
store2.insert_table(&table).unwrap();
}

let mut store = store2;
let timeline = Timeline::new("frame_nr", TimeType::Sequence);

let components = store.all_components(&timeline, ent_path);
Expand Down Expand Up @@ -251,6 +259,7 @@ fn latest_at() {
for config in re_arrow_store::test_util::all_configs() {
let mut store = DataStore::new(InstanceKey::name(), config.clone());
latest_at_impl(&mut store);

// TODO(#1619): bring back garbage collection
// store.gc(
// GarbageCollectionTarget::DropAtLeastPercentage(1.0),
Expand All @@ -272,32 +281,43 @@ fn latest_at_impl(store: &mut DataStore) {
let frame3: TimeInt = 3.into();
let frame4: TimeInt = 4.into();

// helper to insert a row both as a temporal and timeless payload
let insert = |store: &mut DataStore, row| {
// helper to insert a table both as a temporal and timeless payload
let insert_table = |store: &mut DataStore, table: &DataTable| {
// insert temporal
store.insert_row(row).unwrap();
store.insert_table(table).unwrap();

// insert timeless
let mut row_timeless = (*row).clone();
row_timeless.timepoint = Default::default();
store.insert_row(&row_timeless).unwrap();
let mut table_timeless = table.clone();
table_timeless.col_timelines = Default::default();
store.insert_table(&table_timeless).unwrap();
};

let (instances1, colors1) = (build_some_instances(3), build_some_colors(3));
let row1 = test_row!(ent_path @ [build_frame_nr(frame1)] => 3; [instances1.clone(), colors1]);
insert(store, &row1);

let points2 = build_some_point2d(3);
let row2 = test_row!(ent_path @ [build_frame_nr(frame2)] => 3; [instances1, points2]);
insert(store, &row2);

let points3 = build_some_point2d(10);
let row3 = test_row!(ent_path @ [build_frame_nr(frame3)] => 10; [points3]);
insert(store, &row3);

let colors4 = build_some_colors(5);
let row4 = test_row!(ent_path @ [build_frame_nr(frame4)] => 5; [colors4]);
insert(store, &row4);

insert_table(
store,
&DataTable::from_rows(
TableId::random(),
[row1.clone(), row2.clone(), row3.clone(), row4.clone()],
),
);

// Stress test save-to-disk & load-from-disk
let mut store2 = DataStore::new(store.cluster_key(), store.config().clone());
for table in store.to_data_tables(None) {
store2.insert_table(&table).unwrap();
}
let mut store = store2;

if let err @ Err(_) = store.sanity_check() {
store.sort_indices_if_needed();
Expand All @@ -310,7 +330,7 @@ fn latest_at_impl(store: &mut DataStore) {
let components_all = &[ColorRGBA::name(), Point2D::name()];

let df = polars_util::latest_components(
store,
&store,
&LatestAtQuery::new(timeline_frame_nr, frame_nr),
&ent_path,
components_all,
Expand Down Expand Up @@ -433,10 +453,17 @@ fn range_impl(store: &mut DataStore) {
// A single timepoint might have several of those! That's one of the behaviors specific to
// range queries.
#[allow(clippy::type_complexity)]
let mut assert_range_components =
let assert_range_components =
|time_range: TimeRange,
components: [ComponentName; 2],
rows_at_times: &[(Option<TimeInt>, &[(ComponentName, &DataRow)])]| {
// Stress test save-to-disk & load-from-disk
let mut store2 = DataStore::new(store.cluster_key(), store.config().clone());
for table in store.to_data_tables(None) {
store2.insert_table(&table).unwrap();
}
let mut store = store2;

let mut expected_timeless = Vec::<DataFrame>::new();
let mut expected_at_times: IntMap<TimeInt, Vec<DataFrame>> = Default::default();

Expand All @@ -456,7 +483,7 @@ fn range_impl(store: &mut DataStore) {
let components = [InstanceKey::name(), components[0], components[1]];
let query = RangeQuery::new(timeline_frame_nr, time_range);
let dfs = polars_util::range_components(
store,
&store,
&query,
&ent_path,
components[1],
Expand Down
Loading