Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datastore revamp 4: sunset MsgId #1785

Merged
merged 1 commit into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions crates/re_arrow_store/benches/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use re_arrow_store::{DataStore, DataStoreConfig, LatestAtQuery, RangeQuery, Time
use re_log_types::{
component_types::{InstanceKey, Rect2D},
datagen::{build_frame_nr, build_some_instances, build_some_rects},
Component as _, ComponentName, DataCell, DataRow, DataTable, EntityPath, MsgId, TimeType,
Timeline,
Component as _, ComponentName, DataCell, DataRow, DataTable, EntityPath, RowId, TableId,
TimeType, Timeline,
};

criterion_group!(benches, insert, latest_at, latest_at_missing, range);
Expand Down Expand Up @@ -262,10 +262,10 @@ fn range(c: &mut Criterion) {

fn build_table(n: usize, packed: bool) -> DataTable {
let mut table = DataTable::from_rows(
MsgId::ZERO,
TableId::ZERO,
(0..NUM_ROWS).map(move |frame_idx| {
DataRow::from_cells2(
MsgId::random(),
RowId::random(),
"rects",
[build_frame_nr(frame_idx.into())],
n as _,
Expand All @@ -277,7 +277,7 @@ fn build_table(n: usize, packed: bool) -> DataTable {
// Do a serialization roundtrip to pack everything in contiguous memory.
if packed {
let (schema, columns) = table.serialize().unwrap();
table = DataTable::deserialize(MsgId::ZERO, &schema, &columns).unwrap();
table = DataTable::deserialize(TableId::ZERO, &schema, &columns).unwrap();
}

table
Expand All @@ -304,7 +304,7 @@ fn latest_data_at<const N: usize>(

store
.latest_at(&timeline_query, &ent_path, primary, secondaries)
.unwrap_or_else(|| [(); N].map(|_| None))
.map_or_else(|| [(); N].map(|_| None), |(_, cells)| cells)
}

fn range_data<const N: usize>(
Expand Down
6 changes: 3 additions & 3 deletions crates/re_arrow_store/src/polars_util.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use itertools::Itertools;
use polars_core::{prelude::*, series::Series};
use polars_ops::prelude::*;
use re_log_types::{ComponentName, DataCell, EntityPath, TimeInt};
use re_log_types::{ComponentName, DataCell, EntityPath, RowId, TimeInt};

use crate::{ArrayExt, DataStore, LatestAtQuery, RangeQuery};

Expand Down Expand Up @@ -37,9 +37,9 @@ pub fn latest_component(
let cluster_key = store.cluster_key();

let components = &[cluster_key, primary];
let cells = store
let (_, cells) = store
.latest_at(query, ent_path, primary, components)
.unwrap_or([(); 2].map(|_| None));
.unwrap_or((RowId::ZERO, [(); 2].map(|_| None)));

dataframe_from_cells(&cells)
}
Expand Down
43 changes: 21 additions & 22 deletions crates/re_arrow_store/src/store_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use itertools::Itertools;
use nohash_hasher::IntSet;
use re_log::trace;
use re_log_types::{
ComponentName, DataCell, EntityPath, MsgId, RowId, TimeInt, TimePoint, TimeRange, Timeline,
ComponentName, DataCell, EntityPath, RowId, TimeInt, TimePoint, TimeRange, Timeline,
};
use smallvec::SmallVec;

Expand Down Expand Up @@ -151,7 +151,7 @@ impl DataStore {
///
/// ```rust
/// # use polars_core::{prelude::*, series::Series};
/// # use re_log_types::{ComponentName, EntityPath, TimeInt};
/// # use re_log_types::{ComponentName, EntityPath, RowId, TimeInt};
/// # use re_arrow_store::{DataStore, LatestAtQuery, RangeQuery};
/// #
/// pub fn latest_component(
Expand All @@ -163,9 +163,9 @@ impl DataStore {
/// let cluster_key = store.cluster_key();
///
/// let components = &[cluster_key, primary];
/// let cells = store
/// .latest_at(query, ent_path, primary, components)
/// .unwrap_or([(); 2].map(|_| None));
/// let (_, cells) = store
/// .latest_at(&query, ent_path, primary, components)
/// .unwrap_or((RowId::ZERO, [(); 2].map(|_| None)));
///
/// let series: Result<Vec<_>, _> = cells
/// .iter()
Expand Down Expand Up @@ -193,7 +193,7 @@ impl DataStore {
ent_path: &EntityPath,
primary: ComponentName,
components: &[ComponentName; N],
) -> Option<[Option<DataCell>; N]> {
) -> Option<(RowId, [Option<DataCell>; N])> {
crate::profile_function!();

// TODO(cmc): kind & query_id need to somehow propagate through the span system.
Expand Down Expand Up @@ -232,7 +232,7 @@ impl DataStore {
// return the results immediately.
if cells
.as_ref()
.map_or(false, |cells| cells.iter().all(Option::is_some))
.map_or(false, |(_, cells)| cells.iter().all(Option::is_some))
{
return cells;
}
Expand Down Expand Up @@ -260,13 +260,13 @@ impl DataStore {
(None, Some(cells_timeless)) => return Some(cells_timeless),
// we have both temporal & timeless cells: let's merge the two when it makes sense
// and return the end result.
(Some(mut cells), Some(cells_timeless)) => {
(Some((row_id, mut cells)), Some((_, cells_timeless))) => {
for (i, row_idx) in cells_timeless.into_iter().enumerate() {
if cells[i].is_none() {
cells[i] = row_idx;
}
}
return Some(cells);
return Some((row_id, cells));
}
// no cells at all.
(None, None) => {}
Expand Down Expand Up @@ -320,7 +320,7 @@ impl DataStore {
/// ```rust
/// # use arrow2::array::Array;
/// # use polars_core::{prelude::*, series::Series};
/// # use re_log_types::{ComponentName, DataCell, EntityPath, TimeInt};
/// # use re_log_types::{ComponentName, DataCell, EntityPath, RowId, TimeInt};
/// # use re_arrow_store::{DataStore, LatestAtQuery, RangeQuery};
/// #
/// # pub fn dataframe_from_cells<const N: usize>(
Expand Down Expand Up @@ -354,9 +354,9 @@ impl DataStore {
/// let latest_time = query.range.min.as_i64().saturating_sub(1).into();
/// let df_latest = {
/// let query = LatestAtQuery::new(query.timeline, latest_time);
/// let cells = store
/// let (_, cells) = store
/// .latest_at(&query, ent_path, primary, &components)
/// .unwrap_or([(); 2].map(|_| None));
/// .unwrap_or((RowId::ZERO, [(); 2].map(|_| None)));
/// dataframe_from_cells(cells)
/// };
///
Expand Down Expand Up @@ -425,10 +425,10 @@ impl DataStore {
}
}

pub fn get_msg_metadata(&self, msg_id: &MsgId) -> Option<&TimePoint> {
pub fn get_msg_metadata(&self, row_id: &RowId) -> Option<&TimePoint> {
crate::profile_function!();

self.metadata_registry.get(msg_id)
self.metadata_registry.get(row_id)
}

/// Sort all unsorted indices in the store.
Expand All @@ -452,7 +452,7 @@ impl IndexedTable {
time: TimeInt,
primary: ComponentName,
components: &[ComponentName; N],
) -> Option<[Option<DataCell>; N]> {
) -> Option<(RowId, [Option<DataCell>; N])> {
crate::profile_function!();

// Early-exit if this entire table is unaware of this component.
Expand Down Expand Up @@ -660,16 +660,17 @@ impl IndexedBucket {
time: TimeInt,
primary: ComponentName,
components: &[ComponentName; N],
) -> Option<[Option<DataCell>; N]> {
) -> Option<(RowId, [Option<DataCell>; N])> {
crate::profile_function!();

self.sort_indices_if_needed();

let IndexedBucketInner {
is_sorted,
time_range: _,
col_time,
col_insert_id: _,
col_row_id: _,
col_row_id,
col_num_instances: _,
columns,
size_bytes: _,
Expand All @@ -679,8 +680,6 @@ impl IndexedBucket {
// Early-exit if this bucket is unaware of this component.
let column = columns.get(&primary)?;

crate::profile_function!();

trace!(
kind = "latest_at",
%primary,
Expand Down Expand Up @@ -759,7 +758,7 @@ impl IndexedBucket {
}
}

Some(cells)
Some((col_row_id[secondary_row_nr as usize], cells))
}

/// Iterates the bucket in order to return the cells of the the specified `components`,
Expand Down Expand Up @@ -983,7 +982,7 @@ impl PersistentIndexedTable {
&self,
primary: ComponentName,
components: &[ComponentName; N],
) -> Option<[Option<DataCell>; N]> {
) -> Option<(RowId, [Option<DataCell>; N])> {
if self.is_empty() {
return None;
}
Expand Down Expand Up @@ -1057,7 +1056,7 @@ impl PersistentIndexedTable {
}
}

Some(cells)
Some((self.col_row_id[secondary_row_nr as usize], cells))
}

/// Iterates the table in order to return the cells of the the specified `components`,
Expand Down
4 changes: 2 additions & 2 deletions crates/re_arrow_store/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::DataStoreConfig;
macro_rules! test_row {
($entity:ident @ $frames:tt => $n:expr; [$c0:expr $(,)*]) => {
::re_log_types::DataRow::from_cells1(
::re_log_types::MsgId::random(),
::re_log_types::RowId::random(),
$entity.clone(),
$frames,
$n,
Expand All @@ -16,7 +16,7 @@ macro_rules! test_row {
};
($entity:ident @ $frames:tt => $n:expr; [$c0:expr, $c1:expr $(,)*]) => {
::re_log_types::DataRow::from_cells2(
::re_log_types::MsgId::random(),
::re_log_types::RowId::random(),
$entity.clone(),
$frames,
$n,
Expand Down
26 changes: 13 additions & 13 deletions crates/re_arrow_store/tests/correctness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ fn latest_at_emptiness_edge_cases_impl(store: &mut DataStore) {
// bunch of non-existing components
{
let components = &["they".into(), "dont".into(), "exist".into()];
let cells = store
let (_, cells) = store
.latest_at(
&LatestAtQuery::new(timeline_frame_nr, frame40),
&ent_path,
Expand All @@ -159,7 +159,7 @@ fn latest_at_emptiness_edge_cases_impl(store: &mut DataStore) {

// empty component list
{
let cells = store
let (_, cells) = store
.latest_at(
&LatestAtQuery::new(timeline_frame_nr, frame40),
&ent_path,
Expand Down Expand Up @@ -309,53 +309,53 @@ fn gc_correct() {

// TODO(#1619): bring back garbage collection

// let msg_id_chunks = store.gc(
// let row_id_chunks = store.gc(
// GarbageCollectionTarget::DropAtLeastPercentage(1.0),
// Timeline::new("frame_nr", TimeType::Sequence),
// MsgId::name(),
// );

// let msg_ids = msg_id_chunks
// let row_ids = row_id_chunks
// .iter()
// .flat_map(|chunk| arrow_array_deserialize_iterator::<Option<MsgId>>(&**chunk).unwrap())
// .map(Option::unwrap) // MsgId is always present
// .collect::<ahash::HashSet<_>>();
// assert!(!msg_ids.is_empty());
// assert!(!row_ids.is_empty());

// if let err @ Err(_) = store.sanity_check() {
// store.sort_indices_if_needed();
// eprintln!("{store}");
// err.unwrap();
// }
// check_still_readable(&store);
// for msg_id in &msg_ids {
// assert!(store.get_msg_metadata(msg_id).is_some());
// for row_id in &row_ids {
// assert!(store.get_msg_metadata(row_id).is_some());
// }

// store.clear_msg_metadata(&msg_ids);
// store.clear_msg_metadata(&row_ids);

// if let err @ Err(_) = store.sanity_check() {
// store.sort_indices_if_needed();
// eprintln!("{store}");
// err.unwrap();
// }
// check_still_readable(&store);
// for msg_id in &msg_ids {
// assert!(store.get_msg_metadata(msg_id).is_none());
// for row_id in &row_ids {
// assert!(store.get_msg_metadata(row_id).is_none());
// }

// let msg_id_chunks = store.gc(
// let row_id_chunks = store.gc(
// GarbageCollectionTarget::DropAtLeastPercentage(1.0),
// Timeline::new("frame_nr", TimeType::Sequence),
// MsgId::name(),
// );

// let msg_ids = msg_id_chunks
// let row_ids = row_id_chunks
// .iter()
// .flat_map(|chunk| arrow_array_deserialize_iterator::<Option<MsgId>>(&**chunk).unwrap())
// .map(Option::unwrap) // MsgId is always present
// .collect::<ahash::HashSet<_>>();
// assert!(msg_ids.is_empty());
// assert!(row_ids.is_empty());

// if let err @ Err(_) = store.sanity_check() {
// store.sort_indices_if_needed();
Expand Down
Loading