Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Apr 4, 2023
1 parent 05f4876 commit 5cd5814
Show file tree
Hide file tree
Showing 23 changed files with 694 additions and 322 deletions.
2 changes: 1 addition & 1 deletion crates/re_arrow_store/benches/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ fn build_table(n: usize, packed: bool) -> DataTable {
// Do a serialization roundtrip to pack everything in contiguous memory.
if packed {
let (schema, columns) = table.serialize().unwrap();
table = DataTable::deserialize(MsgId::ZERO, &schema, &columns).unwrap();
table = DataTable::deserialize(MsgId::ZERO, &schema, &columns, false).unwrap();
}

table
Expand Down
68 changes: 35 additions & 33 deletions crates/re_arrow_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ fn datastore_internal_repr() {
},
);

let timeless = DataTable::example(false);
let timeless = DataTable::example(true);
eprintln!("{timeless}");
store.insert_table(&timeless).unwrap();

Expand Down Expand Up @@ -317,38 +317,31 @@ pub struct IndexedTable {
/// to free up space.
pub all_components: IntSet<ComponentName>,

/// The total number of rows in this indexed table, accounting for all buckets.
pub total_rows: u64,
/// The number of rows stored in this table, across all of its buckets, in bytes.
pub buckets_num_rows: u64,

/// The size of this table in bytes across all of its buckets, accounting for both data and
/// metadata.
/// The size of both the control & component data stored in this table, across all of its
/// buckets, in bytes.
///
/// Accurately computing the size of arrow arrays is surprisingly costly, which is why we
/// cache this.
/// Also: there are many buckets.
pub total_size_bytes: u64,
/// This is a best-effort approximation, adequate for most purposes (stats,
/// triggering GCs, ...).
pub buckets_size_bytes: u64,
}

impl IndexedTable {
pub fn new(cluster_key: ComponentName, timeline: Timeline, ent_path: EntityPath) -> Self {
let bucket = IndexedBucket::new(cluster_key, timeline);
let buckets_size_bytes = bucket.size_bytes();
Self {
timeline,
ent_path,
buckets: [(i64::MIN.into(), IndexedBucket::new(cluster_key, timeline))].into(),
buckets: [(i64::MIN.into(), bucket)].into(),
cluster_key,
all_components: Default::default(),
total_rows: 0,
total_size_bytes: 0, // TODO(#1619)
buckets_num_rows: 0,
buckets_size_bytes,
}
}

/// Returns a read-only iterator over the raw buckets.
///
/// Do _not_ use this to try and test the internal state of the datastore.
#[doc(hidden)]
pub fn iter_buckets(&self) -> impl ExactSizeIterator<Item = &IndexedBucket> {
self.buckets.values()
}
}

/// An `IndexedBucket` holds a chunk of rows from an [`IndexedTable`]
Expand Down Expand Up @@ -414,25 +407,29 @@ pub struct IndexedBucketInner {
/// (i.e. the table is sparse).
pub columns: IntMap<ComponentName, DataCellColumn>,

/// The size of this bucket in bytes, accounting for both data and metadata.
/// The size of both the control & component data stored in this bucket, in bytes.
///
/// Accurately computing the size of arrow arrays is surprisingly costly, which is why we
/// cache this.
pub total_size_bytes: u64,
/// This is a best-effort approximation, adequate for most purposes (stats,
/// triggering GCs, ...).
///
/// We cache this because there can be many, many buckets.
pub size_bytes: u64,
}

impl Default for IndexedBucketInner {
fn default() -> Self {
Self {
let mut this = Self {
is_sorted: true,
time_range: TimeRange::new(i64::MAX.into(), i64::MIN.into()),
col_time: Default::default(),
col_insert_id: Default::default(),
col_row_id: Default::default(),
col_num_instances: Default::default(),
columns: Default::default(),
total_size_bytes: 0, // TODO(#1619)
}
size_bytes: 0, // NOTE: computed below
};
this.compute_size_bytes();
this
}
}

Expand Down Expand Up @@ -476,15 +473,20 @@ pub struct PersistentIndexedTable {
/// The cells are optional since not all rows will have data for every single component
/// (i.e. the table is sparse).
pub columns: IntMap<ComponentName, DataCellColumn>,

/// The size of this indexed table in bytes, accounting for both data and metadata.
///
/// Accurately computing the size of arrow arrays is surprisingly costly, which is why we
/// cache this.
pub total_size_bytes: u64,
}

impl PersistentIndexedTable {
pub fn new(cluster_key: ComponentName, ent_path: EntityPath) -> Self {
Self {
cluster_key,
ent_path,
col_insert_id: Default::default(),
col_row_id: Default::default(),
col_num_instances: Default::default(),
columns: Default::default(),
}
}

pub fn is_empty(&self) -> bool {
self.col_num_instances.is_empty()
}
Expand Down
3 changes: 1 addition & 2 deletions crates/re_arrow_store/src/store_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl IndexedBucket {
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
size_bytes: _,
} = &*inner.read();

serialize(
Expand Down Expand Up @@ -72,7 +72,6 @@ impl PersistentIndexedTable {
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
} = self;

serialize(
Expand Down
17 changes: 8 additions & 9 deletions crates/re_arrow_store/src/store_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ impl std::fmt::Display for DataStore {
format!(
"{} timeless indexed tables, for a total of {} across {} total rows\n",
timeless_tables.len(),
format_bytes(self.total_timeless_index_size_bytes() as _),
format_number(self.total_timeless_index_rows() as _)
format_bytes(self.total_timeless_size_bytes() as _),
format_number(self.total_timeless_rows() as _)
),
))?;
f.write_str(&indent::indent_all_by(4, "timeless_tables: [\n"))?;
Expand All @@ -53,8 +53,8 @@ impl std::fmt::Display for DataStore {
format!(
"{} indexed tables, for a total of {} across {} total rows\n",
tables.len(),
format_bytes(self.total_temporal_index_size_bytes() as _),
format_number(self.total_temporal_index_rows() as _)
format_bytes(self.total_temporal_size_bytes() as _),
format_number(self.total_temporal_rows() as _)
),
))?;
f.write_str(&indent::indent_all_by(4, "tables: [\n"))?;
Expand Down Expand Up @@ -83,8 +83,8 @@ impl std::fmt::Display for IndexedTable {
buckets,
cluster_key: _,
all_components: _,
total_rows: _,
total_size_bytes: _,
buckets_num_rows: _,
buckets_size_bytes: _,
} = self;

f.write_fmt(format_args!("timeline: {}\n", timeline.name()))?;
Expand Down Expand Up @@ -116,8 +116,8 @@ impl std::fmt::Display for IndexedBucket {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"size: {} across {} rows\n",
format_bytes(self.total_size_bytes() as _),
format_number(self.total_rows() as _),
format_bytes(self.size_bytes() as _),
format_number(self.num_rows() as _),
))?;

let time_range = {
Expand Down Expand Up @@ -156,7 +156,6 @@ impl std::fmt::Display for PersistentIndexedTable {
col_row_id: _,
col_num_instances: _,
columns: _,
total_size_bytes: _,
} = self;

f.write_fmt(format_args!("entity: {ent_path}\n"))?;
Expand Down
3 changes: 1 addition & 2 deletions crates/re_arrow_store/src/store_polars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ impl PersistentIndexedTable {
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
} = self;

let num_rows = self.total_rows() as usize;
Expand Down Expand Up @@ -217,7 +216,7 @@ impl IndexedBucket {
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
size_bytes: _,
} = &*self.inner.read();

let (_, times) = DataTable::serialize_primitive_column(
Expand Down
6 changes: 3 additions & 3 deletions crates/re_arrow_store/src/store_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ impl IndexedBucket {
col_row_id: _,
col_num_instances: _,
columns,
total_size_bytes: _, // TODO(#1619)
size_bytes: _,
} = &*self.inner.read();
debug_assert!(is_sorted);

Expand Down Expand Up @@ -761,7 +761,7 @@ impl IndexedBucket {
col_row_id,
col_num_instances: _,
columns,
total_size_bytes: _, // TODO(#1619)
size_bytes: _,
} = &*self.inner.read();
debug_assert!(is_sorted);

Expand Down Expand Up @@ -873,7 +873,7 @@ impl IndexedBucketInner {
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
size_bytes: _,
} = self;

if *is_sorted {
Expand Down
Loading

0 comments on commit 5cd5814

Please sign in to comment.