Skip to content

Commit

Permalink
DataStore changelog 1: let DataStores know about their StoreId (#…
Browse files Browse the repository at this point in the history
…4202)

The upcoming `StoreView` works in global scope: by registering a view
you subscribe to changes to _all_ `DataStore`s, including those that are
yet to be created.
This is very powerful as it allows views & triggers implementers to
build cross-recording indices as well as be notified as soon as new
recordings come in and go out.

But it means that `StoreEvent`s must indicate which `DataStore` they
originate from, which isn't possible today since the stores themselves
don't know who they are to begin with.
This trivial PR plumbs the `StoreId` all the way through so `DataStore`s
know about their own ID.

Also made `StoreGeneration` account for the GC counter while I was at
it.

---

Requires:
- #4215 

`DataStore` changelog PR series:
- #4202
- #4203
- #4205
- #4206
- #4208
- #4209
  • Loading branch information
teh-cmc authored Nov 15, 2023
1 parent 7b53ba8 commit d0a42df
Show file tree
Hide file tree
Showing 17 changed files with 242 additions and 59 deletions.
6 changes: 5 additions & 1 deletion crates/re_arrow_store/benches/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,11 @@ fn insert_table(
cluster_key: ComponentName,
table: &DataTable,
) -> DataStore {
let mut store = DataStore::new(cluster_key, config);
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
cluster_key,
config,
);
for row in table.to_rows() {
store.insert_row(&row.unwrap()).unwrap();
}
Expand Down
6 changes: 5 additions & 1 deletion crates/re_arrow_store/examples/dump_dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ use re_types_core::Loggable as _;
// ---

fn main() {
let mut store = DataStore::new(InstanceKey::name(), Default::default());
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);

let ent_paths = [
EntityPath::from("this/that"),
Expand Down
6 changes: 5 additions & 1 deletion crates/re_arrow_store/examples/latest_component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ use re_types::{
use re_types_core::Loggable as _;

fn main() {
let mut store = DataStore::new(InstanceKey::name(), Default::default());
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);

let ent_path = EntityPath::from("my/entity");

Expand Down
6 changes: 5 additions & 1 deletion crates/re_arrow_store/examples/latest_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ use re_types::{
use re_types_core::Loggable as _;

fn main() {
let mut store = DataStore::new(InstanceKey::name(), Default::default());
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);

let ent_path = EntityPath::from("my/entity");

Expand Down
6 changes: 5 additions & 1 deletion crates/re_arrow_store/examples/range_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ use re_types::{
use re_types_core::Loggable as _;

fn main() {
let mut store = DataStore::new(InstanceKey::name(), Default::default());
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);

let ent_path = EntityPath::from("this/that");

Expand Down
34 changes: 22 additions & 12 deletions crates/re_arrow_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use smallvec::SmallVec;

use re_log_types::{
DataCell, DataCellColumn, EntityPath, EntityPathHash, ErasedTimeVec, NumInstancesVec, RowId,
RowIdVec, TimeInt, TimePoint, TimeRange, Timeline,
RowIdVec, StoreId, TimeInt, TimePoint, TimeRange, Timeline,
};

// --- Data store ---
Expand Down Expand Up @@ -156,9 +156,12 @@ impl std::ops::DerefMut for ClusterCellCache {

// ---

/// Incremented on each edit
/// Incremented on each edit.
#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct StoreGeneration(u64);
pub struct StoreGeneration {
insert_id: u64,
gc_id: u64,
}

/// A complete data store: covers all timelines, all entities, everything.
///
Expand All @@ -172,6 +175,8 @@ pub struct StoreGeneration(u64);
/// Additionally, if the `polars` feature is enabled, you can dump the entire datastore as a
/// flat denormalized dataframe using [`Self::to_dataframe`].
pub struct DataStore {
pub(crate) id: StoreId,

/// The cluster key specifies a column/component that is guaranteed to always be present for
/// every single row of data within the store.
///
Expand All @@ -193,6 +198,8 @@ pub struct DataStore {
/// the store so far.
///
/// See also [`Self::lookup_datatype`].
//
// TODO(#1809): replace this with a centralized Arrow registry.
pub(crate) type_registry: DataTypeRegistry,

/// Keeps track of arbitrary per-row metadata.
Expand Down Expand Up @@ -227,32 +234,31 @@ pub struct DataStore {
impl Clone for DataStore {
fn clone(&self) -> Self {
Self {
id: self.id.clone(),
cluster_key: self.cluster_key,
config: self.config.clone(),
type_registry: self.type_registry.clone(),
metadata_registry: self.metadata_registry.clone(),
cluster_cell_cache: self.cluster_cell_cache.clone(),
tables: self.tables.clone(),
timeless_tables: self.timeless_tables.clone(),
insert_id: self.insert_id,
query_id: self
.query_id
.load(std::sync::atomic::Ordering::Relaxed)
.into(),
gc_id: self.gc_id,
insert_id: Default::default(),
query_id: Default::default(),
gc_id: Default::default(),
}
}
}

impl DataStore {
/// See [`Self::cluster_key`] for more information about the cluster key.
pub fn new(cluster_key: ComponentName, config: DataStoreConfig) -> Self {
pub fn new(id: StoreId, cluster_key: ComponentName, config: DataStoreConfig) -> Self {
Self {
id,
cluster_key,
config,
cluster_cell_cache: Default::default(),
metadata_registry: Default::default(),
type_registry: Default::default(),
metadata_registry: Default::default(),
tables: Default::default(),
timeless_tables: Default::default(),
insert_id: 0,
Expand All @@ -272,7 +278,10 @@ impl DataStore {
/// Return the current `StoreGeneration`. This can be used to determine whether the
/// database has been modified since the last time it was queried.
pub fn generation(&self) -> StoreGeneration {
StoreGeneration(self.insert_id)
StoreGeneration {
insert_id: self.insert_id,
gc_id: self.gc_id,
}
}

/// See [`Self::cluster_key`] for more information about the cluster key.
Expand Down Expand Up @@ -340,6 +349,7 @@ fn datastore_internal_repr() {
use re_types_core::Loggable as _;

let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
re_types::components::InstanceKey::name(),
DataStoreConfig {
indexed_bucket_num_rows: 0,
Expand Down
4 changes: 3 additions & 1 deletion crates/re_arrow_store/src/store_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ impl std::fmt::Display for DataStore {
#[allow(clippy::string_add)]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
id,
cluster_key,
config,
cluster_cell_cache: _,
metadata_registry: _,
type_registry: _,
metadata_registry: _,
tables,
timeless_tables,
insert_id: _,
Expand All @@ -23,6 +24,7 @@ impl std::fmt::Display for DataStore {

f.write_str("DataStore {\n")?;

f.write_str(&indent::indent_all_by(4, format!("id: {id}\n")))?;
f.write_str(&indent::indent_all_by(
4,
format!("cluster_key: {cluster_key:?}\n"),
Expand Down
60 changes: 50 additions & 10 deletions crates/re_arrow_store/tests/correctness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ fn row_id_ordering_semantics() -> anyhow::Result<()> {
// * Query at frame #11 and make sure we get `point2` because random `RowId`s are monotonically
// increasing.
{
let mut store = DataStore::new(InstanceKey::name(), Default::default());
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);

let row_id = RowId::random();
let row = DataRow::from_component_batches(
Expand Down Expand Up @@ -73,7 +77,11 @@ fn row_id_ordering_semantics() -> anyhow::Result<()> {
// * Insert `point1` at frame #10 with a random `RowId`.
// * Fail to insert `point2` at frame #10 using `point1`s `RowId` because it is illegal.
{
let mut store = DataStore::new(InstanceKey::name(), Default::default());
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);

let row_id = RowId::random();

Expand All @@ -100,7 +108,11 @@ fn row_id_ordering_semantics() -> anyhow::Result<()> {
// * Insert `point2` at frame #10 using `point1`'s `RowId`, decremented by one.
// * Query at frame #11 and make sure we get `point1` because of intra-timestamp tie-breaks.
{
let mut store = DataStore::new(InstanceKey::name(), Default::default());
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);

let row_id1 = RowId::random();
let row_id2 = row_id1.next();
Expand Down Expand Up @@ -151,7 +163,11 @@ fn write_errors() {
DataCell::from_component_sparse::<InstanceKey>([Some(1), None, Some(3)])
}

let mut store = DataStore::new(InstanceKey::name(), Default::default());
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);
let row = test_row!(ent_path @
[build_frame_nr(32.into()), build_log_time(Time::now())] => 3; [
build_sparse_instances(), build_some_positions2d(3)
Expand All @@ -171,7 +187,11 @@ fn write_errors() {
DataCell::from_component::<InstanceKey>([1, 2, 2])
}

let mut store = DataStore::new(InstanceKey::name(), Default::default());
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);
{
let row = test_row!(ent_path @
[build_frame_nr(32.into()), build_log_time(Time::now())] => 3; [
Expand All @@ -195,7 +215,11 @@ fn write_errors() {
}

{
let mut store = DataStore::new(InstanceKey::name(), Default::default());
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);

let mut row = test_row!(ent_path @ [
build_frame_nr(1.into()),
Expand Down Expand Up @@ -228,7 +252,11 @@ fn latest_at_emptiness_edge_cases() {
init_logs();

for config in re_arrow_store::test_util::all_configs() {
let mut store = DataStore::new(InstanceKey::name(), config.clone());
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
config.clone(),
);
latest_at_emptiness_edge_cases_impl(&mut store);
}
}
Expand Down Expand Up @@ -354,7 +382,11 @@ fn range_join_across_single_row() {
init_logs();

for config in re_arrow_store::test_util::all_configs() {
let mut store = DataStore::new(InstanceKey::name(), config.clone());
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
config.clone(),
);
range_join_across_single_row_impl(&mut store);
}
}
Expand Down Expand Up @@ -422,7 +454,11 @@ fn range_join_across_single_row_impl(store: &mut DataStore) {
fn gc_correct() {
init_logs();

let mut store = DataStore::new(InstanceKey::name(), DataStoreConfig::default());
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
DataStoreConfig::default(),
);

let stats_empty = DataStoreStats::from_store(&store);

Expand Down Expand Up @@ -501,7 +537,11 @@ fn entity_min_time_correct() -> anyhow::Result<()> {
init_logs();

for config in re_arrow_store::test_util::all_configs() {
let mut store = DataStore::new(InstanceKey::name(), config.clone());
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
config.clone(),
);
entity_min_time_correct_impl(&mut store)?;
}

Expand Down
Loading

0 comments on commit d0a42df

Please sign in to comment.