Skip to content

Commit

Permalink
Datastore revamp 7: garbage collection
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Apr 12, 2023
1 parent b7bbe10 commit 0f1cd6f
Show file tree
Hide file tree
Showing 34 changed files with 1,226 additions and 499 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 47 additions & 2 deletions crates/re_arrow_store/benches/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
use arrow2::array::UnionArray;
use criterion::{criterion_group, criterion_main, Criterion};

use re_arrow_store::{DataStore, DataStoreConfig, LatestAtQuery, RangeQuery, TimeInt, TimeRange};
use re_arrow_store::{
DataStore, DataStoreConfig, GarbageCollectionTarget, LatestAtQuery, RangeQuery, TimeInt,
TimeRange,
};
use re_log_types::{
component_types::{InstanceKey, Rect2D},
datagen::{build_frame_nr, build_some_instances, build_some_rects},
Component as _, ComponentName, DataCell, DataRow, DataTable, EntityPath, RowId, TableId,
TimeType, Timeline,
};

criterion_group!(benches, insert, latest_at, latest_at_missing, range);
criterion_group!(benches, insert, latest_at, latest_at_missing, range, gc);
criterion_main!(benches);

// ---
Expand Down Expand Up @@ -258,6 +261,48 @@ fn range(c: &mut Criterion) {
}
}

fn gc(c: &mut Criterion) {
let mut group = c.benchmark_group(format!(
"datastore/num_rows={NUM_ROWS}/num_instances={NUM_INSTANCES}/gc"
));
group.throughput(criterion::Throughput::Elements(
(NUM_INSTANCES * NUM_ROWS) as _,
));

let mut table = build_table(NUM_INSTANCES as usize, false);
table.compute_all_size_bytes();

// Default config
group.bench_function("default", |b| {
let store = insert_table(Default::default(), InstanceKey::name(), &table);
b.iter(|| {
let mut store = store.clone();
let (_, stats_diff) = store.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0 / 3.0));
stats_diff
});
});

// Emulate more or less bucket
for &num_rows_per_bucket in num_rows_per_bucket() {
group.bench_function(format!("bucketsz={num_rows_per_bucket}"), |b| {
let store = insert_table(
DataStoreConfig {
indexed_bucket_num_rows: num_rows_per_bucket,
..Default::default()
},
InstanceKey::name(),
&table,
);
b.iter(|| {
let mut store = store.clone();
let (_, stats_diff) =
store.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0 / 3.0));
stats_diff
});
});
}
}

// --- Helpers ---

fn build_table(n: usize, packed: bool) -> DataTable {
Expand Down
5 changes: 3 additions & 2 deletions crates/re_arrow_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ pub use self::arrow_util::ArrayExt;
pub use self::store::{DataStore, DataStoreConfig};
pub use self::store_gc::GarbageCollectionTarget;
pub use self::store_read::{LatestAtQuery, RangeQuery};
pub use self::store_stats::DataStoreStats;
pub use self::store_stats::{DataStoreRowStats, DataStoreStats};
pub use self::store_write::{WriteError, WriteResult};

pub(crate) use self::store::{
IndexedBucket, IndexedBucketInner, IndexedTable, PersistentIndexedTable,
ClusterCellCache, DataTypeRegistry, IndexedBucket, IndexedBucketInner, IndexedTable,
MetadataRegistry, PersistentIndexedTable,
};

// Re-exports
Expand Down
94 changes: 79 additions & 15 deletions crates/re_arrow_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use nohash_hasher::{IntMap, IntSet};
use parking_lot::RwLock;
use re_log_types::{
ComponentName, DataCell, DataCellColumn, EntityPath, EntityPathHash, ErasedTimeVec,
NumInstancesVec, RowId, RowIdVec, TimeInt, TimePoint, TimeRange, Timeline,
NumInstancesVec, RowId, RowIdVec, SizeBytes, TimeInt, TimePoint, TimeRange, Timeline,
};

// --- Data store ---
Expand All @@ -23,8 +23,13 @@ pub struct DataStoreConfig {
/// to a specific timeline _and_ a specific entity.
///
/// This effectively puts an upper bound on the number of rows that need to be sorted when an
/// indexed bucket gets out of order.
/// indexed bucket gets out of order (e.g. because of new insertions or a GC pass).
/// This is a tradeoff: less rows means faster sorts at the cost of more metadata overhead.
/// In particular:
/// - Query performance scales inversely logarithmically to this number (i.e. it gets better
/// the higher this number gets).
/// - GC performance scales quadratically with this number (i.e. it gets better the lower this
/// number gets).
///
/// See [`Self::DEFAULT`] for defaults.
pub indexed_bucket_num_rows: u64,
Expand Down Expand Up @@ -53,7 +58,12 @@ impl Default for DataStoreConfig {

impl DataStoreConfig {
pub const DEFAULT: Self = Self {
indexed_bucket_num_rows: 1024,
// NOTE: Empirical testing has shown that 512 is a good balance between sorting
// and binary search costs with the current GC implementation.
//
// Garbage collection costs are entirely driven by the number of buckets around, the size
// of the data itself has no impact.
indexed_bucket_num_rows: 512,
store_insert_ids: cfg!(debug_assertions),
enable_typecheck: cfg!(debug_assertions),
};
Expand All @@ -67,8 +77,8 @@ pub type InsertIdVec = SmallVec<[u64; 4]>;
/// so far.
///
/// See also [`DataStore::lookup_datatype`].
#[derive(Default)]
pub struct DataTypeRegistry(IntMap<ComponentName, DataType>);
#[derive(Debug, Default, Clone)]
pub struct DataTypeRegistry(pub IntMap<ComponentName, DataType>);

impl std::ops::Deref for DataTypeRegistry {
type Target = IntMap<ComponentName, DataType>;
Expand All @@ -87,11 +97,11 @@ impl std::ops::DerefMut for DataTypeRegistry {
}

/// Keeps track of arbitrary per-row metadata.
#[derive(Default)]
pub struct MetadataRegistry<T: Clone>(HashMap<RowId, T>);
#[derive(Debug, Default, Clone)]
pub struct MetadataRegistry<T: Clone>(pub BTreeMap<RowId, T>);

impl<T: Clone> std::ops::Deref for MetadataRegistry<T> {
type Target = HashMap<RowId, T>;
type Target = BTreeMap<RowId, T>;

#[inline]
fn deref(&self) -> &Self::Target {
Expand All @@ -106,6 +116,29 @@ impl<T: Clone> std::ops::DerefMut for MetadataRegistry<T> {
}
}

/// Used to cache auto-generated cluster cells (`[0]`, `[0, 1]`, `[0, 1, 2]`, ...) so that they
/// can be properly deduplicated on insertion.
#[derive(Debug, Default, Clone)]
pub struct ClusterCellCache(pub IntMap<u32, DataCell>);

impl std::ops::Deref for ClusterCellCache {
type Target = IntMap<u32, DataCell>;

#[inline]
fn deref(&self) -> &Self::Target {
&self.0
}
}

impl std::ops::DerefMut for ClusterCellCache {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

// ---

/// A complete data store: covers all timelines, all entities, everything.
///
/// ## Debugging
Expand Down Expand Up @@ -148,7 +181,7 @@ pub struct DataStore {

/// Used to cache auto-generated cluster cells (`[0]`, `[0, 1]`, `[0, 1, 2]`, ...)
/// so that they can be properly deduplicated on insertion.
pub(crate) cluster_cell_cache: IntMap<u32, DataCell>,
pub(crate) cluster_cell_cache: ClusterCellCache,

/// All temporal [`IndexedTable`]s for all entities on all timelines.
///
Expand All @@ -167,10 +200,29 @@ pub struct DataStore {
pub(crate) query_id: AtomicU64,

/// Monotonically increasing ID for GCs.
#[allow(dead_code)]
pub(crate) gc_id: u64,
}

impl Clone for DataStore {
fn clone(&self) -> Self {
Self {
cluster_key: self.cluster_key,
config: self.config.clone(),
type_registry: self.type_registry.clone(),
metadata_registry: self.metadata_registry.clone(),
cluster_cell_cache: self.cluster_cell_cache.clone(),
tables: self.tables.clone(),
timeless_tables: self.timeless_tables.clone(),
insert_id: self.insert_id,
query_id: self
.query_id
.load(std::sync::atomic::Ordering::Relaxed)
.into(),
gc_id: self.gc_id,
}
}
}

impl DataStore {
/// See [`Self::cluster_key`] for more information about the cluster key.
pub fn new(cluster_key: ComponentName, config: DataStoreConfig) -> Self {
Expand Down Expand Up @@ -293,7 +345,7 @@ fn datastore_internal_repr() {
/// ```
//
// TODO(#1524): inline visualization once it's back to a manageable state
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct IndexedTable {
/// The timeline this table operates in, for debugging purposes.
pub timeline: Timeline,
Expand Down Expand Up @@ -336,7 +388,7 @@ pub struct IndexedTable {
impl IndexedTable {
pub fn new(cluster_key: ComponentName, timeline: Timeline, ent_path: EntityPath) -> Self {
let bucket = IndexedBucket::new(cluster_key, timeline);
let buckets_size_bytes = bucket.size_bytes();
let buckets_size_bytes = bucket.total_size_bytes();
Self {
timeline,
ent_path,
Expand Down Expand Up @@ -364,6 +416,16 @@ pub struct IndexedBucket {
pub inner: RwLock<IndexedBucketInner>,
}

impl Clone for IndexedBucket {
fn clone(&self) -> Self {
Self {
timeline: self.timeline,
cluster_key: self.cluster_key,
inner: RwLock::new(self.inner.read().clone()),
}
}
}

impl IndexedBucket {
fn new(cluster_key: ComponentName, timeline: Timeline) -> Self {
Self {
Expand All @@ -375,7 +437,7 @@ impl IndexedBucket {
}

/// See [`IndexedBucket`]; this is a helper struct to simplify interior mutability.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct IndexedBucketInner {
/// Are the rows in this table chunk sorted?
///
Expand Down Expand Up @@ -412,7 +474,8 @@ pub struct IndexedBucketInner {
/// (i.e. the table is sparse).
pub columns: IntMap<ComponentName, DataCellColumn>,

/// The size of both the control & component data stored in this bucket, in bytes.
/// The size of both the control & component data stored in this bucket, heap and stack
/// included, in bytes.
///
/// This is a best-effort approximation, adequate for most purposes (stats,
/// triggering GCs, ...).
Expand Down Expand Up @@ -449,7 +512,8 @@ impl Default for IndexedBucketInner {
/// ```
//
// TODO(#1524): inline visualization once it's back to a manageable state
#[derive(Debug)]
// TODO(#1807): timeless should be row-id ordered too then
#[derive(Debug, Clone)]
pub struct PersistentIndexedTable {
/// The entity this table is related to, for debugging purposes.
pub ent_path: EntityPath,
Expand Down
25 changes: 18 additions & 7 deletions crates/re_arrow_store/src/store_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ fn serialize(
let mut schema = Schema::default();
let mut columns = Vec::new();

// NOTE: Empty table / bucket.
if col_row_id.is_empty() {
return Ok((schema, Chunk::new(columns)));
}

{
let (control_schema, control_columns) =
serialize_control_columns(col_time, col_insert_id, col_row_id, col_num_instances)?;
Expand Down Expand Up @@ -135,10 +140,13 @@ fn serialize_control_columns(
// - time
// - num_instances

let (insert_id_field, insert_id_column) =
DataTable::serialize_primitive_column(COLUMN_INSERT_ID, col_insert_id, None)?;
schema.fields.push(insert_id_field);
columns.push(insert_id_column);
// NOTE: Optional column, so make sure it's actually there:
if !col_insert_id.is_empty() {
let (insert_id_field, insert_id_column) =
DataTable::serialize_primitive_column(COLUMN_INSERT_ID, col_insert_id, None)?;
schema.fields.push(insert_id_field);
columns.push(insert_id_column);
}

let (row_id_field, row_id_column) =
DataTable::serialize_control_column(COLUMN_ROW_ID, col_row_id)?;
Expand Down Expand Up @@ -187,9 +195,12 @@ fn serialize_data_columns(
}

for (component, column) in table {
let (field, column) = DataTable::serialize_data_column(component.as_str(), column)?;
schema.fields.push(field);
columns.push(column);
// NOTE: Don't serialize columns with only null values.
if column.iter().any(Option::is_some) {
let (field, column) = DataTable::serialize_data_column(component.as_str(), column)?;
schema.fields.push(field);
columns.push(column);
}
}

Ok((schema, columns))
Expand Down
4 changes: 2 additions & 2 deletions crates/re_arrow_store/src/store_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl DataStore {
col_row_id: col_row_id.clone(),
col_timelines: Default::default(),
col_entity_path: std::iter::repeat_with(|| ent_path.clone())
.take(table.total_rows() as _)
.take(table.num_rows() as _)
.collect(),
col_num_instances: col_num_instances.clone(),
columns: columns.clone(), // shallow
Expand Down Expand Up @@ -89,7 +89,7 @@ impl DataStore {
col_timelines: [(*timeline, col_time.iter().copied().map(Some).collect())]
.into(),
col_entity_path: std::iter::repeat_with(|| table.ent_path.clone())
.take(table.total_rows() as _)
.take(table.num_rows() as _)
.collect(),
col_num_instances: col_num_instances.clone(),
columns: columns.clone(), // shallow
Expand Down
Loading

0 comments on commit 0f1cd6f

Please sign in to comment.