Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Apr 4, 2023
1 parent 05f4876 commit 3bcf851
Show file tree
Hide file tree
Showing 11 changed files with 355 additions and 244 deletions.
47 changes: 12 additions & 35 deletions crates/re_arrow_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ fn datastore_internal_repr() {
},
);

let timeless = DataTable::example(false);
let timeless = DataTable::example(true);
eprintln!("{timeless}");
store.insert_table(&timeless).unwrap();

Expand Down Expand Up @@ -316,17 +316,6 @@ pub struct IndexedTable {
/// have been set in the past even if all instances of that component have since been purged
/// to free up space.
pub all_components: IntSet<ComponentName>,

/// The total number of rows in this indexed table, accounting for all buckets.
pub total_rows: u64,

/// The size of this table in bytes across all of its buckets, accounting for both data and
/// metadata.
///
/// Accurately computing the size of arrow arrays is surprisingly costly, which is why we
/// cache this.
/// Also: there are many buckets.
pub total_size_bytes: u64,
}

impl IndexedTable {
Expand All @@ -337,18 +326,8 @@ impl IndexedTable {
buckets: [(i64::MIN.into(), IndexedBucket::new(cluster_key, timeline))].into(),
cluster_key,
all_components: Default::default(),
total_rows: 0,
total_size_bytes: 0, // TODO(#1619)
}
}

/// Returns a read-only iterator over the raw buckets.
///
/// Do _not_ use this to try and test the internal state of the datastore.
#[doc(hidden)]
pub fn iter_buckets(&self) -> impl ExactSizeIterator<Item = &IndexedBucket> {
self.buckets.values()
}
}

/// An `IndexedBucket` holds a chunk of rows from an [`IndexedTable`]
Expand Down Expand Up @@ -413,12 +392,6 @@ pub struct IndexedBucketInner {
/// The cells are optional since not all rows will have data for every single component
/// (i.e. the table is sparse).
pub columns: IntMap<ComponentName, DataCellColumn>,

/// The size of this bucket in bytes, accounting for both data and metadata.
///
/// Accurately computing the size of arrow arrays is surprisingly costly, which is why we
/// cache this.
pub total_size_bytes: u64,
}

impl Default for IndexedBucketInner {
Expand All @@ -431,7 +404,6 @@ impl Default for IndexedBucketInner {
col_row_id: Default::default(),
col_num_instances: Default::default(),
columns: Default::default(),
total_size_bytes: 0, // TODO(#1619)
}
}
}
Expand Down Expand Up @@ -476,15 +448,20 @@ pub struct PersistentIndexedTable {
/// The cells are optional since not all rows will have data for every single component
/// (i.e. the table is sparse).
pub columns: IntMap<ComponentName, DataCellColumn>,

/// The size of this indexed table in bytes, accounting for both data and metadata.
///
/// Accurately computing the size of arrow arrays is surprisingly costly, which is why we
/// cache this.
pub total_size_bytes: u64,
}

impl PersistentIndexedTable {
pub fn new(cluster_key: ComponentName, ent_path: EntityPath) -> Self {
Self {
cluster_key,
ent_path,
col_insert_id: Default::default(),
col_row_id: Default::default(),
col_num_instances: Default::default(),
columns: Default::default(),
}
}

pub fn is_empty(&self) -> bool {
self.col_num_instances.is_empty()
}
Expand Down
2 changes: 0 additions & 2 deletions crates/re_arrow_store/src/store_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ impl IndexedBucket {
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
} = &*inner.read();

serialize(
Expand Down Expand Up @@ -72,7 +71,6 @@ impl PersistentIndexedTable {
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
} = self;

serialize(
Expand Down
15 changes: 6 additions & 9 deletions crates/re_arrow_store/src/store_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ impl std::fmt::Display for DataStore {
format!(
"{} timeless indexed tables, for a total of {} across {} total rows\n",
timeless_tables.len(),
format_bytes(self.total_timeless_index_size_bytes() as _),
format_number(self.total_timeless_index_rows() as _)
format_bytes(self.total_timeless_size_bytes() as _),
format_number(self.total_timeless_rows() as _)
),
))?;
f.write_str(&indent::indent_all_by(4, "timeless_tables: [\n"))?;
Expand All @@ -53,8 +53,8 @@ impl std::fmt::Display for DataStore {
format!(
"{} indexed tables, for a total of {} across {} total rows\n",
tables.len(),
format_bytes(self.total_temporal_index_size_bytes() as _),
format_number(self.total_temporal_index_rows() as _)
format_bytes(self.total_temporal_size_bytes() as _),
format_number(self.total_temporal_rows() as _)
),
))?;
f.write_str(&indent::indent_all_by(4, "tables: [\n"))?;
Expand Down Expand Up @@ -83,8 +83,6 @@ impl std::fmt::Display for IndexedTable {
buckets,
cluster_key: _,
all_components: _,
total_rows: _,
total_size_bytes: _,
} = self;

f.write_fmt(format_args!("timeline: {}\n", timeline.name()))?;
Expand Down Expand Up @@ -116,8 +114,8 @@ impl std::fmt::Display for IndexedBucket {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"size: {} across {} rows\n",
format_bytes(self.total_size_bytes() as _),
format_number(self.total_rows() as _),
format_bytes(self.size_bytes() as _),
format_number(self.num_rows() as _),
))?;

let time_range = {
Expand Down Expand Up @@ -156,7 +154,6 @@ impl std::fmt::Display for PersistentIndexedTable {
col_row_id: _,
col_num_instances: _,
columns: _,
total_size_bytes: _,
} = self;

f.write_fmt(format_args!("entity: {ent_path}\n"))?;
Expand Down
2 changes: 0 additions & 2 deletions crates/re_arrow_store/src/store_polars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ impl PersistentIndexedTable {
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
} = self;

let num_rows = self.total_rows() as usize;
Expand Down Expand Up @@ -217,7 +216,6 @@ impl IndexedBucket {
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
} = &*self.inner.read();

let (_, times) = DataTable::serialize_primitive_column(
Expand Down
3 changes: 0 additions & 3 deletions crates/re_arrow_store/src/store_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,6 @@ impl IndexedBucket {
col_row_id: _,
col_num_instances: _,
columns,
total_size_bytes: _, // TODO(#1619)
} = &*self.inner.read();
debug_assert!(is_sorted);

Expand Down Expand Up @@ -761,7 +760,6 @@ impl IndexedBucket {
col_row_id,
col_num_instances: _,
columns,
total_size_bytes: _, // TODO(#1619)
} = &*self.inner.read();
debug_assert!(is_sorted);

Expand Down Expand Up @@ -873,7 +871,6 @@ impl IndexedBucketInner {
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
} = self;

if *is_sorted {
Expand Down
119 changes: 60 additions & 59 deletions crates/re_arrow_store/src/store_sanity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ use crate::{DataStore, IndexedBucket, IndexedBucketInner, IndexedTable, Persiste

#[derive(thiserror::Error, Debug)]
pub enum SanityError {
#[error("Reported number of rows for {origin} is out of sync: got {got}, expected {expected}")]
SizeOutOfSync {
origin: &'static str,
expected: u64,
got: u64,
},

#[error("Column '{component}' has too few/many rows: got {got} instead of {expected}")]
ColumnLengthMismatch {
component: ComponentName,
Expand Down Expand Up @@ -53,32 +60,75 @@ impl DataStore {
}
}

// --- Persistent Indices ---
// --- Temporal ---

impl PersistentIndexedTable {
impl IndexedTable {
/// Runs the sanity check suite for the entire table.
///
/// Returns an error if anything looks wrong.
pub fn sanity_check(&self) -> SanityResult<()> {
crate::profile_function!();

// No two buckets should ever overlap time-range-wise.
{
let time_ranges = self
.buckets
.values()
.map(|bucket| bucket.inner.read().time_range)
.collect::<Vec<_>>();
for time_ranges in time_ranges.windows(2) {
let &[t1, t2] = time_ranges else { unreachable!() };
if t1.max.as_i64() >= t2.min.as_i64() {
return Err(SanityError::OverlappingBuckets {
t1_max: t1.max.as_i64(),
t1_max_formatted: self.timeline.typ().format(t1.max),
t2_max: t2.max.as_i64(),
t2_max_formatted: self.timeline.typ().format(t2.max),
});
}
}
}

// Run individual bucket sanity check suites too.
for bucket in self.buckets.values() {
bucket.sanity_check()?;
}

Ok(())
}
}

impl IndexedBucket {
/// Runs the sanity check suite for the entire bucket.
///
/// Returns an error if anything looks wrong.
pub fn sanity_check(&self) -> SanityResult<()> {
crate::profile_function!();

let Self {
ent_path: _,
timeline: _,
cluster_key,
inner,
} = self;

let IndexedBucketInner {
is_sorted: _,
time_range: _,
col_time,
col_insert_id,
col_row_id,
col_num_instances,
columns,
total_size_bytes: _, // TODO(#1619)
} = self;
} = &*inner.read();

// All columns should be `Self::num_rows` long.
{
let num_rows = self.total_rows();
let num_rows = self.num_rows();

let column_lengths = [
(!col_insert_id.is_empty())
.then(|| (DataStore::insert_id_key(), col_insert_id.len())), //
Some((COLUMN_TIMEPOINT.into(), col_time.len())),
Some((COLUMN_ROW_ID.into(), col_row_id.len())),
Some((COLUMN_NUM_INSTANCES.into(), col_num_instances.len())),
]
Expand Down Expand Up @@ -117,73 +167,27 @@ impl PersistentIndexedTable {
}
}

// TODO(#1619): recomputing shouldnt change the size

Ok(())
}
}

// --- Indices ---
// --- Timeless ---

impl IndexedTable {
impl PersistentIndexedTable {
/// Runs the sanity check suite for the entire table.
///
/// Returns an error if anything looks wrong.
pub fn sanity_check(&self) -> SanityResult<()> {
crate::profile_function!();

// No two buckets should ever overlap time-range-wise.
{
let time_ranges = self
.buckets
.values()
.map(|bucket| bucket.inner.read().time_range)
.collect::<Vec<_>>();
for time_ranges in time_ranges.windows(2) {
let &[t1, t2] = time_ranges else { unreachable!() };
if t1.max.as_i64() >= t2.min.as_i64() {
return Err(SanityError::OverlappingBuckets {
t1_max: t1.max.as_i64(),
t1_max_formatted: self.timeline.typ().format(t1.max),
t2_max: t2.max.as_i64(),
t2_max_formatted: self.timeline.typ().format(t2.max),
});
}
}
}

// Run individual bucket sanity check suites too.
for bucket in self.buckets.values() {
bucket.sanity_check()?;
}

Ok(())
}
}

impl IndexedBucket {
/// Runs the sanity check suite for the entire bucket.
///
/// Returns an error if anything looks wrong.
pub fn sanity_check(&self) -> SanityResult<()> {
crate::profile_function!();

let Self {
timeline: _,
ent_path: _,
cluster_key,
inner,
} = self;

let IndexedBucketInner {
is_sorted: _,
time_range: _,
col_time,
col_insert_id,
col_row_id,
col_num_instances,
columns,
total_size_bytes: _, // TODO(#1619)
} = &*inner.read();
} = self;

// All columns should be `Self::num_rows` long.
{
Expand All @@ -192,7 +196,6 @@ impl IndexedBucket {
let column_lengths = [
(!col_insert_id.is_empty())
.then(|| (DataStore::insert_id_key(), col_insert_id.len())), //
Some((COLUMN_TIMEPOINT.into(), col_time.len())),
Some((COLUMN_ROW_ID.into(), col_row_id.len())),
Some((COLUMN_NUM_INSTANCES.into(), col_num_instances.len())),
]
Expand Down Expand Up @@ -231,8 +234,6 @@ impl IndexedBucket {
}
}

// TODO(#1619): recomputing shouldnt change the size

Ok(())
}
}
Loading

0 comments on commit 3bcf851

Please sign in to comment.