Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GC improvements 0: use-case driven benchmarks #4394

Merged
merged 5 commits into from
Dec 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions crates/re_arrow_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ all-features = true
[features]
default = []

## Enables access to re_types' test components/datatypes.
testing = ["re_log_types/testing", "re_types/testing"]

## Enables `parking_lot`'s deadlock detection background thread.
deadlock_detection = ["parking_lot/deadlock_detection"]

Expand Down Expand Up @@ -73,7 +70,7 @@ polars-ops = { workspace = true, optional = true, features = [

[dev-dependencies]
re_log_types = { workspace = true, features = ["testing"] }
re_types = { workspace = true, features = ["datagen"] }
re_types = { workspace = true, features = ["datagen", "testing"] }

anyhow.workspace = true
criterion.workspace = true
Expand All @@ -98,7 +95,7 @@ bench = false
[[example]]
name = "dump_dataframe"
path = "examples/dump_dataframe.rs"
required-features = ["polars", "testing"]
required-features = ["polars"]

[[example]]
name = "latest_component"
Expand All @@ -116,15 +113,17 @@ path = "examples/range_components.rs"
required-features = ["polars"]


[[bench]]
name = "arrow2"
harness = false

[[bench]]
name = "data_store"
harness = false
required-features = ["testing"]

[[bench]]
name = "arrow2"
name = "gc"
harness = false
required-features = ["testing"]

[[bench]]
name = "vectors"
Expand Down
265 changes: 265 additions & 0 deletions crates/re_arrow_store/benches/gc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

use criterion::{criterion_group, criterion_main, BatchSize, Criterion};

use itertools::Itertools;
use re_arrow_store::{
DataStore, DataStoreConfig, GarbageCollectionOptions, GarbageCollectionTarget,
};
use re_log_types::{
build_frame_nr, build_log_time, DataRow, DataTable, EntityPath, RowId, TableId, Time, TimePoint,
};
use re_types::components::InstanceKey;
use re_types_core::{AsComponents, ComponentBatch, ComponentName, Loggable as _};

criterion_group!(benches, plotting_dashboard, timeless_logs);
criterion_main!(benches);

// ---

#[cfg(not(debug_assertions))]
mod constants {
pub const NUM_ENTITY_PATHS: usize = 20;
pub const NUM_ROWS_PER_ENTITY_PATH: usize = 10_000;
}

// `cargo test` also runs the benchmark setup code, so make sure they run quickly:
#[cfg(debug_assertions)]
mod constants {
pub const NUM_ENTITY_PATHS: usize = 1;
pub const NUM_ROWS_PER_ENTITY_PATH: usize = 1;
}

use constants::{NUM_ENTITY_PATHS, NUM_ROWS_PER_ENTITY_PATH};

fn num_rows_per_bucket() -> &'static [u64] {
#[cfg(feature = "core_benchmarks_only")]
{
&[]
}
#[cfg(not(feature = "core_benchmarks_only"))]
{
&[256, 512, 1024, 2048]
}
}

// --- Benchmarks ---

fn plotting_dashboard(c: &mut Criterion) {
const DROP_AT_LEAST: f64 = 0.3;

let mut group = c.benchmark_group(format!(
"datastore/num_entities={NUM_ENTITY_PATHS}/num_rows_per_entity={NUM_ROWS_PER_ENTITY_PATH}/plotting_dashboard/drop_at_least={DROP_AT_LEAST}"
));
group.throughput(criterion::Throughput::Elements(
((NUM_ENTITY_PATHS * NUM_ROWS_PER_ENTITY_PATH) as f64 * DROP_AT_LEAST) as _,
));
group.sample_size(10);

let gc_settings = GarbageCollectionOptions {
target: GarbageCollectionTarget::DropAtLeastFraction(DROP_AT_LEAST),
gc_timeless: true,
protect_latest: 1,
purge_empty_tables: false,
dont_protect: Default::default(),
};

let mut timegen = |i| {
[
build_log_time(Time::from_seconds_since_epoch(i as _)),
build_frame_nr((i as i64).into()),
]
.into()
};

let mut datagen = |i| {
Box::new(re_types::archetypes::TimeSeriesScalar::new(i as f64)) as Box<dyn AsComponents>
};

// Default config
group.bench_function("default", |b| {
let store = build_store(
Default::default(),
InstanceKey::name(),
false,
&mut timegen,
&mut datagen,
);
b.iter_batched(
|| store.clone(),
|mut store| {
let (_, stats_diff) = store.gc(&gc_settings);
stats_diff
},
BatchSize::LargeInput,
);
});

// Emulate more or less bucket
for &num_rows_per_bucket in num_rows_per_bucket() {
group.bench_function(format!("bucketsz={num_rows_per_bucket}"), |b| {
let store = build_store(
DataStoreConfig {
indexed_bucket_num_rows: num_rows_per_bucket,
..Default::default()
},
InstanceKey::name(),
false,
&mut timegen,
&mut datagen,
);
b.iter_batched(
|| store.clone(),
|mut store| {
let (_, stats_diff) = store.gc(&gc_settings);
stats_diff
},
BatchSize::LargeInput,
);
});
}
}

fn timeless_logs(c: &mut Criterion) {
const DROP_AT_LEAST: f64 = 0.3;

let mut group = c.benchmark_group(format!(
"datastore/num_entities={NUM_ENTITY_PATHS}/num_rows_per_entity={NUM_ROWS_PER_ENTITY_PATH}/timeless_logs/drop_at_least={DROP_AT_LEAST}"
));
group.throughput(criterion::Throughput::Elements(
((NUM_ENTITY_PATHS * NUM_ROWS_PER_ENTITY_PATH) as f64 * DROP_AT_LEAST) as _,
));
group.sample_size(10);

let gc_settings = GarbageCollectionOptions {
target: GarbageCollectionTarget::DropAtLeastFraction(DROP_AT_LEAST),
gc_timeless: true,
protect_latest: 1,
purge_empty_tables: false,
dont_protect: Default::default(),
};

let mut timegen = |_| TimePoint::timeless();

let mut datagen = |i: usize| {
Box::new(re_types::archetypes::TextLog::new(i.to_string())) as Box<dyn AsComponents>
};

// Default config
group.bench_function("default", |b| {
let store = build_store(
Default::default(),
InstanceKey::name(),
false,
&mut timegen,
&mut datagen,
);
b.iter_batched(
|| store.clone(),
|mut store| {
let (_, stats_diff) = store.gc(&gc_settings);
stats_diff
},
BatchSize::LargeInput,
);
});

// Emulate more or less bucket
for &num_rows_per_bucket in num_rows_per_bucket() {
group.bench_function(format!("bucketsz={num_rows_per_bucket}"), |b| {
let store = build_store(
DataStoreConfig {
indexed_bucket_num_rows: num_rows_per_bucket,
..Default::default()
},
InstanceKey::name(),
false,
&mut timegen,
&mut datagen,
);
b.iter_batched(
|| store.clone(),
|mut store| {
let (_, stats_diff) = store.gc(&gc_settings);
stats_diff
},
BatchSize::LargeInput,
);
});
}
}

// --- Helpers ---

fn build_store<FT, FD>(
config: DataStoreConfig,
cluster_key: ComponentName,
packed: bool,
timegen: &mut FT,
datagen: &mut FD,
) -> DataStore
where
FT: FnMut(usize) -> TimePoint,
FD: FnMut(usize) -> Box<dyn AsComponents>,
{
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
cluster_key,
config,
);

let tables = (0..NUM_ENTITY_PATHS)
.map(|i| build_table(format!("entity_path_{i}").into(), packed, timegen, datagen))
.collect_vec();
let mut rows_per_table = tables.iter().map(|table| table.to_rows()).collect_vec();

// NOTE: interleave insertions between entities to more closely match real world scenarios.
for _ in 0..NUM_ROWS_PER_ENTITY_PATH {
#[allow(clippy::needless_range_loop)] // readability
for i in 0..NUM_ENTITY_PATHS {
let row = rows_per_table[i].next().unwrap();
store.insert_row(&row.unwrap()).unwrap();
}
}

store
}

fn build_table<FT, FD>(
entity_path: EntityPath,
packed: bool,
timegen: &mut FT,
datagen: &mut FD,
) -> DataTable
where
FT: FnMut(usize) -> TimePoint,
FD: FnMut(usize) -> Box<dyn AsComponents>,
{
let mut table = DataTable::from_rows(
TableId::ZERO,
(0..NUM_ROWS_PER_ENTITY_PATH).map(move |i| {
DataRow::from_component_batches(
RowId::random(),
// NOTE: insert in multiple timelines to more closely match real world scenarios.
timegen(i),
entity_path.clone(),
datagen(i)
.as_component_batches()
.iter()
.map(|batch| batch as &dyn ComponentBatch),
)
.unwrap()
}),
);

// Do a serialization roundtrip to pack everything in contiguous memory.
if packed {
let (schema, columns) = table.serialize().unwrap();
table = DataTable::deserialize(TableId::ZERO, &schema, &columns).unwrap();
}

table.compute_all_size_bytes();

table
}
2 changes: 1 addition & 1 deletion crates/re_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ webbrowser = { workspace = true, optional = true }


[dev-dependencies]
re_arrow_store = { workspace = true, features = ["testing"] }
re_arrow_store.workspace = true

itertools.workspace = true
ndarray-rand.workspace = true
Expand Down
26 changes: 26 additions & 0 deletions crates/re_types_core/src/loggable_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,32 @@ impl<'a> std::ops::Deref for MaybeOwnedComponentBatch<'a> {
}
}

impl<'a> LoggableBatch for MaybeOwnedComponentBatch<'a> {
type Name = ComponentName;

#[inline]
fn name(&self) -> Self::Name {
self.as_ref().name()
}

#[inline]
fn num_instances(&self) -> usize {
self.as_ref().num_instances()
}

#[inline]
fn arrow_field(&self) -> arrow2::datatypes::Field {
self.as_ref().arrow_field()
}

#[inline]
fn to_arrow(&self) -> SerializationResult<Box<dyn ::arrow2::array::Array>> {
self.as_ref().to_arrow()
}
}

impl<'a> ComponentBatch for MaybeOwnedComponentBatch<'a> {}

// --- Unary ---

impl<L: Clone + Loggable> LoggableBatch for L {
Expand Down
Loading