Skip to content

Commit

Permalink
Datastore revamp 4: sunset MsgId (#1785)
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc authored Apr 12, 2023
1 parent 698e51b commit 6bd5323
Show file tree
Hide file tree
Showing 51 changed files with 460 additions and 443 deletions.
12 changes: 6 additions & 6 deletions crates/re_arrow_store/benches/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use re_arrow_store::{DataStore, DataStoreConfig, LatestAtQuery, RangeQuery, Time
use re_log_types::{
component_types::{InstanceKey, Rect2D},
datagen::{build_frame_nr, build_some_instances, build_some_rects},
Component as _, ComponentName, DataCell, DataRow, DataTable, EntityPath, MsgId, TimeType,
Timeline,
Component as _, ComponentName, DataCell, DataRow, DataTable, EntityPath, RowId, TableId,
TimeType, Timeline,
};

criterion_group!(benches, insert, latest_at, latest_at_missing, range);
Expand Down Expand Up @@ -262,10 +262,10 @@ fn range(c: &mut Criterion) {

fn build_table(n: usize, packed: bool) -> DataTable {
let mut table = DataTable::from_rows(
MsgId::ZERO,
TableId::ZERO,
(0..NUM_ROWS).map(move |frame_idx| {
DataRow::from_cells2(
MsgId::random(),
RowId::random(),
"rects",
[build_frame_nr(frame_idx.into())],
n as _,
Expand All @@ -277,7 +277,7 @@ fn build_table(n: usize, packed: bool) -> DataTable {
// Do a serialization roundtrip to pack everything in contiguous memory.
if packed {
let (schema, columns) = table.serialize().unwrap();
table = DataTable::deserialize(MsgId::ZERO, &schema, &columns).unwrap();
table = DataTable::deserialize(TableId::ZERO, &schema, &columns).unwrap();
}

table
Expand All @@ -304,7 +304,7 @@ fn latest_data_at<const N: usize>(

store
.latest_at(&timeline_query, &ent_path, primary, secondaries)
.unwrap_or_else(|| [(); N].map(|_| None))
.map_or_else(|| [(); N].map(|_| None), |(_, cells)| cells)
}

fn range_data<const N: usize>(
Expand Down
6 changes: 3 additions & 3 deletions crates/re_arrow_store/src/polars_util.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use itertools::Itertools;
use polars_core::{prelude::*, series::Series};
use polars_ops::prelude::*;
use re_log_types::{ComponentName, DataCell, EntityPath, TimeInt};
use re_log_types::{ComponentName, DataCell, EntityPath, RowId, TimeInt};

use crate::{ArrayExt, DataStore, LatestAtQuery, RangeQuery};

Expand Down Expand Up @@ -37,9 +37,9 @@ pub fn latest_component(
let cluster_key = store.cluster_key();

let components = &[cluster_key, primary];
let cells = store
let (_, cells) = store
.latest_at(query, ent_path, primary, components)
.unwrap_or([(); 2].map(|_| None));
.unwrap_or((RowId::ZERO, [(); 2].map(|_| None)));

dataframe_from_cells(&cells)
}
Expand Down
43 changes: 21 additions & 22 deletions crates/re_arrow_store/src/store_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use itertools::Itertools;
use nohash_hasher::IntSet;
use re_log::trace;
use re_log_types::{
ComponentName, DataCell, EntityPath, MsgId, RowId, TimeInt, TimePoint, TimeRange, Timeline,
ComponentName, DataCell, EntityPath, RowId, TimeInt, TimePoint, TimeRange, Timeline,
};
use smallvec::SmallVec;

Expand Down Expand Up @@ -151,7 +151,7 @@ impl DataStore {
///
/// ```rust
/// # use polars_core::{prelude::*, series::Series};
/// # use re_log_types::{ComponentName, EntityPath, TimeInt};
/// # use re_log_types::{ComponentName, EntityPath, RowId, TimeInt};
/// # use re_arrow_store::{DataStore, LatestAtQuery, RangeQuery};
/// #
/// pub fn latest_component(
Expand All @@ -163,9 +163,9 @@ impl DataStore {
/// let cluster_key = store.cluster_key();
///
/// let components = &[cluster_key, primary];
/// let cells = store
/// .latest_at(query, ent_path, primary, components)
/// .unwrap_or([(); 2].map(|_| None));
/// let (_, cells) = store
/// .latest_at(&query, ent_path, primary, components)
/// .unwrap_or((RowId::ZERO, [(); 2].map(|_| None)));
///
/// let series: Result<Vec<_>, _> = cells
/// .iter()
Expand Down Expand Up @@ -193,7 +193,7 @@ impl DataStore {
ent_path: &EntityPath,
primary: ComponentName,
components: &[ComponentName; N],
) -> Option<[Option<DataCell>; N]> {
) -> Option<(RowId, [Option<DataCell>; N])> {
crate::profile_function!();

// TODO(cmc): kind & query_id need to somehow propagate through the span system.
Expand Down Expand Up @@ -232,7 +232,7 @@ impl DataStore {
// return the results immediately.
if cells
.as_ref()
.map_or(false, |cells| cells.iter().all(Option::is_some))
.map_or(false, |(_, cells)| cells.iter().all(Option::is_some))
{
return cells;
}
Expand Down Expand Up @@ -260,13 +260,13 @@ impl DataStore {
(None, Some(cells_timeless)) => return Some(cells_timeless),
// we have both temporal & timeless cells: let's merge the two when it makes sense
// and return the end result.
(Some(mut cells), Some(cells_timeless)) => {
(Some((row_id, mut cells)), Some((_, cells_timeless))) => {
for (i, row_idx) in cells_timeless.into_iter().enumerate() {
if cells[i].is_none() {
cells[i] = row_idx;
}
}
return Some(cells);
return Some((row_id, cells));
}
// no cells at all.
(None, None) => {}
Expand Down Expand Up @@ -320,7 +320,7 @@ impl DataStore {
/// ```rust
/// # use arrow2::array::Array;
/// # use polars_core::{prelude::*, series::Series};
/// # use re_log_types::{ComponentName, DataCell, EntityPath, TimeInt};
/// # use re_log_types::{ComponentName, DataCell, EntityPath, RowId, TimeInt};
/// # use re_arrow_store::{DataStore, LatestAtQuery, RangeQuery};
/// #
/// # pub fn dataframe_from_cells<const N: usize>(
Expand Down Expand Up @@ -354,9 +354,9 @@ impl DataStore {
/// let latest_time = query.range.min.as_i64().saturating_sub(1).into();
/// let df_latest = {
/// let query = LatestAtQuery::new(query.timeline, latest_time);
/// let cells = store
/// let (_, cells) = store
/// .latest_at(&query, ent_path, primary, &components)
/// .unwrap_or([(); 2].map(|_| None));
/// .unwrap_or((RowId::ZERO, [(); 2].map(|_| None)));
/// dataframe_from_cells(cells)
/// };
///
Expand Down Expand Up @@ -425,10 +425,10 @@ impl DataStore {
}
}

pub fn get_msg_metadata(&self, msg_id: &MsgId) -> Option<&TimePoint> {
pub fn get_msg_metadata(&self, row_id: &RowId) -> Option<&TimePoint> {
crate::profile_function!();

self.metadata_registry.get(msg_id)
self.metadata_registry.get(row_id)
}

/// Sort all unsorted indices in the store.
Expand All @@ -452,7 +452,7 @@ impl IndexedTable {
time: TimeInt,
primary: ComponentName,
components: &[ComponentName; N],
) -> Option<[Option<DataCell>; N]> {
) -> Option<(RowId, [Option<DataCell>; N])> {
crate::profile_function!();

// Early-exit if this entire table is unaware of this component.
Expand Down Expand Up @@ -660,16 +660,17 @@ impl IndexedBucket {
time: TimeInt,
primary: ComponentName,
components: &[ComponentName; N],
) -> Option<[Option<DataCell>; N]> {
) -> Option<(RowId, [Option<DataCell>; N])> {
crate::profile_function!();

self.sort_indices_if_needed();

let IndexedBucketInner {
is_sorted,
time_range: _,
col_time,
col_insert_id: _,
col_row_id: _,
col_row_id,
col_num_instances: _,
columns,
size_bytes: _,
Expand All @@ -679,8 +680,6 @@ impl IndexedBucket {
// Early-exit if this bucket is unaware of this component.
let column = columns.get(&primary)?;

crate::profile_function!();

trace!(
kind = "latest_at",
%primary,
Expand Down Expand Up @@ -759,7 +758,7 @@ impl IndexedBucket {
}
}

Some(cells)
Some((col_row_id[secondary_row_nr as usize], cells))
}

/// Iterates the bucket in order to return the cells of the the specified `components`,
Expand Down Expand Up @@ -983,7 +982,7 @@ impl PersistentIndexedTable {
&self,
primary: ComponentName,
components: &[ComponentName; N],
) -> Option<[Option<DataCell>; N]> {
) -> Option<(RowId, [Option<DataCell>; N])> {
if self.is_empty() {
return None;
}
Expand Down Expand Up @@ -1057,7 +1056,7 @@ impl PersistentIndexedTable {
}
}

Some(cells)
Some((self.col_row_id[secondary_row_nr as usize], cells))
}

/// Iterates the table in order to return the cells of the the specified `components`,
Expand Down
4 changes: 2 additions & 2 deletions crates/re_arrow_store/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::DataStoreConfig;
macro_rules! test_row {
($entity:ident @ $frames:tt => $n:expr; [$c0:expr $(,)*]) => {
::re_log_types::DataRow::from_cells1(
::re_log_types::MsgId::random(),
::re_log_types::RowId::random(),
$entity.clone(),
$frames,
$n,
Expand All @@ -16,7 +16,7 @@ macro_rules! test_row {
};
($entity:ident @ $frames:tt => $n:expr; [$c0:expr, $c1:expr $(,)*]) => {
::re_log_types::DataRow::from_cells2(
::re_log_types::MsgId::random(),
::re_log_types::RowId::random(),
$entity.clone(),
$frames,
$n,
Expand Down
26 changes: 13 additions & 13 deletions crates/re_arrow_store/tests/correctness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ fn latest_at_emptiness_edge_cases_impl(store: &mut DataStore) {
// bunch of non-existing components
{
let components = &["they".into(), "dont".into(), "exist".into()];
let cells = store
let (_, cells) = store
.latest_at(
&LatestAtQuery::new(timeline_frame_nr, frame40),
&ent_path,
Expand All @@ -159,7 +159,7 @@ fn latest_at_emptiness_edge_cases_impl(store: &mut DataStore) {

// empty component list
{
let cells = store
let (_, cells) = store
.latest_at(
&LatestAtQuery::new(timeline_frame_nr, frame40),
&ent_path,
Expand Down Expand Up @@ -309,53 +309,53 @@ fn gc_correct() {

// TODO(#1619): bring back garbage collection

// let msg_id_chunks = store.gc(
// let row_id_chunks = store.gc(
// GarbageCollectionTarget::DropAtLeastPercentage(1.0),
// Timeline::new("frame_nr", TimeType::Sequence),
// MsgId::name(),
// );

// let msg_ids = msg_id_chunks
// let row_ids = row_id_chunks
// .iter()
// .flat_map(|chunk| arrow_array_deserialize_iterator::<Option<MsgId>>(&**chunk).unwrap())
// .map(Option::unwrap) // MsgId is always present
// .collect::<ahash::HashSet<_>>();
// assert!(!msg_ids.is_empty());
// assert!(!row_ids.is_empty());

// if let err @ Err(_) = store.sanity_check() {
// store.sort_indices_if_needed();
// eprintln!("{store}");
// err.unwrap();
// }
// check_still_readable(&store);
// for msg_id in &msg_ids {
// assert!(store.get_msg_metadata(msg_id).is_some());
// for row_id in &row_ids {
// assert!(store.get_msg_metadata(row_id).is_some());
// }

// store.clear_msg_metadata(&msg_ids);
// store.clear_msg_metadata(&row_ids);

// if let err @ Err(_) = store.sanity_check() {
// store.sort_indices_if_needed();
// eprintln!("{store}");
// err.unwrap();
// }
// check_still_readable(&store);
// for msg_id in &msg_ids {
// assert!(store.get_msg_metadata(msg_id).is_none());
// for row_id in &row_ids {
// assert!(store.get_msg_metadata(row_id).is_none());
// }

// let msg_id_chunks = store.gc(
// let row_id_chunks = store.gc(
// GarbageCollectionTarget::DropAtLeastPercentage(1.0),
// Timeline::new("frame_nr", TimeType::Sequence),
// MsgId::name(),
// );

// let msg_ids = msg_id_chunks
// let row_ids = row_id_chunks
// .iter()
// .flat_map(|chunk| arrow_array_deserialize_iterator::<Option<MsgId>>(&**chunk).unwrap())
// .map(Option::unwrap) // MsgId is always present
// .collect::<ahash::HashSet<_>>();
// assert!(msg_ids.is_empty());
// assert!(row_ids.is_empty());

// if let err @ Err(_) = store.sanity_check() {
// store.sort_indices_if_needed();
Expand Down
Loading

0 comments on commit 6bd5323

Please sign in to comment.