Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datastore revamp 3: efficient incremental stats #1739

Merged
merged 1 commit into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/re_arrow_store/benches/arrow2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ fn estimated_size_bytes(c: &mut Criterion) {

{
let cells = generate_cells(kind);
let arrays = cells.iter().map(|cell| cell.as_arrow()).collect_vec();
let arrays = cells.iter().map(|cell| cell.to_arrow()).collect_vec();
let total_instances = arrays.iter().map(|array| array.len() as u32).sum::<u32>();
assert_eq!(total_instances, (NUM_ROWS * NUM_INSTANCES) as u32);

Expand Down
2 changes: 1 addition & 1 deletion crates/re_arrow_store/benches/arrow2_convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ fn deserialize(c: &mut Criterion) {
group.throughput(criterion::Throughput::Elements(NUM_INSTANCES as _));

let cell = DataCell::from_component::<InstanceKey>(0..NUM_INSTANCES as u64);
let data = cell.as_arrow();
let data = cell.to_arrow();

{
group.bench_function("arrow2_convert", |b| {
Expand Down
68 changes: 35 additions & 33 deletions crates/re_arrow_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ fn datastore_internal_repr() {
},
);

let timeless = DataTable::example(false);
let timeless = DataTable::example(true);
eprintln!("{timeless}");
store.insert_table(&timeless).unwrap();

Expand Down Expand Up @@ -317,38 +317,31 @@ pub struct IndexedTable {
/// to free up space.
pub all_components: IntSet<ComponentName>,

/// The total number of rows in this indexed table, accounting for all buckets.
pub total_rows: u64,
/// The number of rows stored in this table, across all of its buckets.
pub buckets_num_rows: u64,

/// The size of this table in bytes across all of its buckets, accounting for both data and
/// metadata.
/// The size of both the control & component data stored in this table, across all of its
/// buckets, in bytes.
///
/// Accurately computing the size of arrow arrays is surprisingly costly, which is why we
/// cache this.
/// Also: there are many buckets.
pub total_size_bytes: u64,
/// This is a best-effort approximation, adequate for most purposes (stats,
/// triggering GCs, ...).
pub buckets_size_bytes: u64,
}

impl IndexedTable {
pub fn new(cluster_key: ComponentName, timeline: Timeline, ent_path: EntityPath) -> Self {
let bucket = IndexedBucket::new(cluster_key, timeline);
let buckets_size_bytes = bucket.size_bytes();
Self {
timeline,
ent_path,
buckets: [(i64::MIN.into(), IndexedBucket::new(cluster_key, timeline))].into(),
buckets: [(i64::MIN.into(), bucket)].into(),
cluster_key,
all_components: Default::default(),
total_rows: 0,
total_size_bytes: 0, // TODO(#1619)
buckets_num_rows: 0,
buckets_size_bytes,
}
}

/// Returns a read-only iterator over the raw buckets.
///
/// Do _not_ use this to try and test the internal state of the datastore.
#[doc(hidden)]
pub fn iter_buckets(&self) -> impl ExactSizeIterator<Item = &IndexedBucket> {
self.buckets.values()
}
}

/// An `IndexedBucket` holds a chunk of rows from an [`IndexedTable`]
Expand Down Expand Up @@ -414,25 +407,29 @@ pub struct IndexedBucketInner {
/// (i.e. the table is sparse).
pub columns: IntMap<ComponentName, DataCellColumn>,

/// The size of this bucket in bytes, accounting for both data and metadata.
/// The size of both the control & component data stored in this bucket, in bytes.
///
/// Accurately computing the size of arrow arrays is surprisingly costly, which is why we
/// cache this.
pub total_size_bytes: u64,
/// This is a best-effort approximation, adequate for most purposes (stats,
/// triggering GCs, ...).
///
/// We cache this because there can be many, many buckets.
pub size_bytes: u64,
}

impl Default for IndexedBucketInner {
fn default() -> Self {
Self {
let mut this = Self {
is_sorted: true,
time_range: TimeRange::new(i64::MAX.into(), i64::MIN.into()),
col_time: Default::default(),
col_insert_id: Default::default(),
col_row_id: Default::default(),
col_num_instances: Default::default(),
columns: Default::default(),
total_size_bytes: 0, // TODO(#1619)
}
size_bytes: 0, // NOTE: computed below
};
this.compute_size_bytes();
this
}
}

Expand Down Expand Up @@ -476,15 +473,20 @@ pub struct PersistentIndexedTable {
/// The cells are optional since not all rows will have data for every single component
/// (i.e. the table is sparse).
pub columns: IntMap<ComponentName, DataCellColumn>,

/// The size of this indexed table in bytes, accounting for both data and metadata.
///
/// Accurately computing the size of arrow arrays is surprisingly costly, which is why we
/// cache this.
pub total_size_bytes: u64,
}

impl PersistentIndexedTable {
pub fn new(cluster_key: ComponentName, ent_path: EntityPath) -> Self {
Self {
cluster_key,
ent_path,
col_insert_id: Default::default(),
col_row_id: Default::default(),
col_num_instances: Default::default(),
columns: Default::default(),
}
}

pub fn is_empty(&self) -> bool {
self.col_num_instances.is_empty()
}
Expand Down
3 changes: 1 addition & 2 deletions crates/re_arrow_store/src/store_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl IndexedBucket {
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
size_bytes: _,
} = &*inner.read();

serialize(
Expand Down Expand Up @@ -72,7 +72,6 @@ impl PersistentIndexedTable {
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
} = self;

serialize(
Expand Down
17 changes: 8 additions & 9 deletions crates/re_arrow_store/src/store_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ impl std::fmt::Display for DataStore {
format!(
"{} timeless indexed tables, for a total of {} across {} total rows\n",
timeless_tables.len(),
format_bytes(self.total_timeless_index_size_bytes() as _),
format_number(self.total_timeless_index_rows() as _)
format_bytes(self.total_timeless_size_bytes() as _),
format_number(self.total_timeless_rows() as _)
),
))?;
f.write_str(&indent::indent_all_by(4, "timeless_tables: [\n"))?;
Expand All @@ -53,8 +53,8 @@ impl std::fmt::Display for DataStore {
format!(
"{} indexed tables, for a total of {} across {} total rows\n",
tables.len(),
format_bytes(self.total_temporal_index_size_bytes() as _),
format_number(self.total_temporal_index_rows() as _)
format_bytes(self.total_temporal_size_bytes() as _),
format_number(self.total_temporal_rows() as _)
),
))?;
f.write_str(&indent::indent_all_by(4, "tables: [\n"))?;
Expand Down Expand Up @@ -83,8 +83,8 @@ impl std::fmt::Display for IndexedTable {
buckets,
cluster_key: _,
all_components: _,
total_rows: _,
total_size_bytes: _,
buckets_num_rows: _,
buckets_size_bytes: _,
} = self;

f.write_fmt(format_args!("timeline: {}\n", timeline.name()))?;
Expand Down Expand Up @@ -116,8 +116,8 @@ impl std::fmt::Display for IndexedBucket {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"size: {} across {} rows\n",
format_bytes(self.total_size_bytes() as _),
format_number(self.total_rows() as _),
format_bytes(self.size_bytes() as _),
format_number(self.num_rows() as _),
))?;

let time_range = {
Expand Down Expand Up @@ -156,7 +156,6 @@ impl std::fmt::Display for PersistentIndexedTable {
col_row_id: _,
col_num_instances: _,
columns: _,
total_size_bytes: _,
} = self;

f.write_fmt(format_args!("entity: {ent_path}\n"))?;
Expand Down
3 changes: 1 addition & 2 deletions crates/re_arrow_store/src/store_polars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ impl PersistentIndexedTable {
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
} = self;

let num_rows = self.total_rows() as usize;
Expand Down Expand Up @@ -217,7 +216,7 @@ impl IndexedBucket {
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
size_bytes: _,
} = &*self.inner.read();

let (_, times) = DataTable::serialize_primitive_column(
Expand Down
10 changes: 5 additions & 5 deletions crates/re_arrow_store/src/store_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ impl DataStore {
/// .map(|cell| {
/// Series::try_from((
/// cell.component_name().as_str(),
/// cell.as_arrow(),
/// cell.to_arrow(),
/// ))
/// })
/// .collect();
Expand Down Expand Up @@ -332,7 +332,7 @@ impl DataStore {
/// # .map(|cell| {
/// # Series::try_from((
/// # cell.component_name().as_str(),
/// # cell.as_arrow(),
/// # cell.to_arrow(),
/// # ))
/// # })
/// # .collect();
Expand Down Expand Up @@ -672,7 +672,7 @@ impl IndexedBucket {
col_row_id: _,
col_num_instances: _,
columns,
total_size_bytes: _, // TODO(#1619)
size_bytes: _,
} = &*self.inner.read();
debug_assert!(is_sorted);

Expand Down Expand Up @@ -787,7 +787,7 @@ impl IndexedBucket {
col_row_id,
col_num_instances: _,
columns,
total_size_bytes: _, // TODO(#1619)
size_bytes: _,
} = &*self.inner.read();
debug_assert!(is_sorted);

Expand Down Expand Up @@ -899,7 +899,7 @@ impl IndexedBucketInner {
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
size_bytes: _,
} = self;

if *is_sorted {
Expand Down
Loading