Skip to content

Commit

Permalink
end-to-end batching 3: introduce DataRow and DataTable (#1673)
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc authored Mar 27, 2023
1 parent 97cf74c commit 8a37e50
Show file tree
Hide file tree
Showing 29 changed files with 1,487 additions and 851 deletions.
51 changes: 26 additions & 25 deletions crates/re_arrow_store/benches/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use re_arrow_store::{DataStore, DataStoreConfig, LatestAtQuery, RangeQuery, Time
use re_log_types::{
component_types::{InstanceKey, Rect2D},
datagen::{build_frame_nr, build_some_instances, build_some_rects},
msg_bundle::{try_build_msg_bundle2, MsgBundle},
Component as _, ComponentName, EntityPath, MsgId, TimeType, Timeline,
Component as _, ComponentName, DataRow, EntityPath, MsgId, TimeType, Timeline,
};

// ---
Expand All @@ -27,28 +26,30 @@ const NUM_RECTS: i64 = 1;

// --- Benchmarks ---

// TODO(cmc): need additional benches for full tables

fn insert(c: &mut Criterion) {
{
let msgs = build_messages(NUM_RECTS as usize);
let rows = build_rows(NUM_RECTS as usize);
let mut group = c.benchmark_group("datastore/insert/batch/rects");
group.throughput(criterion::Throughput::Elements(
(NUM_RECTS * NUM_FRAMES) as _,
));
group.bench_function("insert", |b| {
b.iter(|| insert_messages(Default::default(), InstanceKey::name(), msgs.iter()));
b.iter(|| insert_rows(Default::default(), InstanceKey::name(), rows.iter()));
});
}
}

fn latest_at_batch(c: &mut Criterion) {
{
let msgs = build_messages(NUM_RECTS as usize);
let store = insert_messages(Default::default(), InstanceKey::name(), msgs.iter());
let rows = build_rows(NUM_RECTS as usize);
let store = insert_rows(Default::default(), InstanceKey::name(), rows.iter());
let mut group = c.benchmark_group("datastore/latest_at/batch/rects");
group.throughput(criterion::Throughput::Elements(NUM_RECTS as _));
group.bench_function("query", |b| {
b.iter(|| {
let results = latest_messages_at(&store, Rect2D::name(), &[Rect2D::name()]);
let results = latest_data_at(&store, Rect2D::name(), &[Rect2D::name()]);
let rects = results[0]
.as_ref()
.unwrap()
Expand All @@ -70,27 +71,27 @@ fn latest_at_missing_components(c: &mut Criterion) {
};

{
let msgs = build_messages(NUM_RECTS as usize);
let store = insert_messages(config.clone(), InstanceKey::name(), msgs.iter());
let msgs = build_rows(NUM_RECTS as usize);
let store = insert_rows(config.clone(), InstanceKey::name(), msgs.iter());
let mut group = c.benchmark_group("datastore/latest_at/missing_components");
group.throughput(criterion::Throughput::Elements(NUM_RECTS as _));
group.bench_function("primary", |b| {
b.iter(|| {
let results =
latest_messages_at(&store, "non_existing_component".into(), &[Rect2D::name()]);
latest_data_at(&store, "non_existing_component".into(), &[Rect2D::name()]);
assert!(results[0].is_none());
});
});
}

{
let msgs = build_messages(NUM_RECTS as usize);
let store = insert_messages(config, InstanceKey::name(), msgs.iter());
let msgs = build_rows(NUM_RECTS as usize);
let store = insert_rows(config, InstanceKey::name(), msgs.iter());
let mut group = c.benchmark_group("datastore/latest_at/missing_components");
group.throughput(criterion::Throughput::Elements(NUM_RECTS as _));
group.bench_function("secondaries", |b| {
b.iter(|| {
let results = latest_messages_at(
let results = latest_data_at(
&store,
Rect2D::name(),
&[
Expand All @@ -109,15 +110,15 @@ fn latest_at_missing_components(c: &mut Criterion) {

fn range_batch(c: &mut Criterion) {
{
let msgs = build_messages(NUM_RECTS as usize);
let store = insert_messages(Default::default(), InstanceKey::name(), msgs.iter());
let msgs = build_rows(NUM_RECTS as usize);
let store = insert_rows(Default::default(), InstanceKey::name(), msgs.iter());
let mut group = c.benchmark_group("datastore/range/batch/rects");
group.throughput(criterion::Throughput::Elements(
(NUM_RECTS * NUM_FRAMES) as _,
));
group.bench_function("query", |b| {
b.iter(|| {
let msgs = range_messages(&store, [Rect2D::name()]);
let msgs = range_data(&store, [Rect2D::name()]);
for (cur_time, (time, results)) in msgs.enumerate() {
let time = time.unwrap();
assert_eq!(cur_time as i64, time.as_i64());
Expand Down Expand Up @@ -146,31 +147,31 @@ criterion_main!(benches);

// --- Helpers ---

fn build_messages(n: usize) -> Vec<MsgBundle> {
fn build_rows(n: usize) -> Vec<DataRow> {
(0..NUM_FRAMES)
.map(move |frame_idx| {
try_build_msg_bundle2(
MsgId::ZERO,
DataRow::from_cells2(
MsgId::random(),
"rects",
[build_frame_nr(frame_idx.into())],
n as _,
(build_some_instances(n), build_some_rects(n)),
)
.unwrap()
})
.collect()
}

fn insert_messages<'a>(
fn insert_rows<'a>(
config: DataStoreConfig,
cluster_key: ComponentName,
msgs: impl Iterator<Item = &'a MsgBundle>,
rows: impl Iterator<Item = &'a DataRow>,
) -> DataStore {
let mut store = DataStore::new(cluster_key, config);
msgs.for_each(|msg_bundle| store.insert_row(msg_bundle).unwrap());
rows.for_each(|row| store.insert_row(row).unwrap());
store
}

fn latest_messages_at<const N: usize>(
fn latest_data_at<const N: usize>(
store: &DataStore,
primary: ComponentName,
secondaries: &[ComponentName; N],
Expand All @@ -185,7 +186,7 @@ fn latest_messages_at<const N: usize>(
store.get(secondaries, &row_indices)
}

fn range_messages<const N: usize>(
fn range_data<const N: usize>(
store: &DataStore,
components: [ComponentName; N],
) -> impl Iterator<Item = (Option<TimeInt>, [Option<Box<dyn Array>>; N])> + '_ {
Expand Down
67 changes: 33 additions & 34 deletions crates/re_arrow_store/examples/dump_dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! POLARS_FMT_MAX_ROWS=100 cargo r -p re_arrow_store --example dump_dataframe
//! ```
use re_arrow_store::{test_bundle, DataStore};
use re_arrow_store::{test_row, DataStore};
use re_log_types::{
component_types::InstanceKey,
datagen::{
Expand All @@ -25,51 +25,50 @@ fn main() {
];

for ent_path in &ent_paths {
let bundle1 = test_bundle!(ent_path @ [
build_frame_nr(1.into()), build_log_time(Time::now()),
] => [build_some_instances(2), build_some_rects(2)]);
store.insert_row(&bundle1).unwrap();
let row1 = test_row!(ent_path @ [
build_frame_nr(1.into()), build_log_time(Time::now()),
] => 2; [build_some_instances(2), build_some_rects(2)]);
store.insert_row(&row1).unwrap();
}

for ent_path in &ent_paths {
let bundle2 = test_bundle!(ent_path @ [
build_frame_nr(2.into())
] => [build_some_instances(2), build_some_point2d(2)]);
store.insert_row(&bundle2).unwrap();
let row2 = test_row!(ent_path @ [
build_frame_nr(2.into())
] => 2; [build_some_instances(2), build_some_point2d(2)]);
store.insert_row(&row2).unwrap();
// Insert timelessly too!
let bundle2 =
test_bundle!(ent_path @ [] => [build_some_instances(2), build_some_point2d(2)]);
store.insert_row(&bundle2).unwrap();
let row2 = test_row!(ent_path @ [] => 2; [build_some_instances(2), build_some_point2d(2)]);
store.insert_row(&row2).unwrap();

let bundle3 = test_bundle!(ent_path @ [
build_frame_nr(3.into()), build_log_time(Time::now()),
] => [build_some_instances_from(25..29), build_some_point2d(4)]);
store.insert_row(&bundle3).unwrap();
let row3 = test_row!(ent_path @ [
build_frame_nr(3.into()), build_log_time(Time::now()),
] => 4; [build_some_instances_from(25..29), build_some_point2d(4)]);
store.insert_row(&row3).unwrap();
// Insert timelessly too!
let bundle3 = test_bundle!(ent_path @ [] => [build_some_instances_from(25..29), build_some_point2d(4)]);
store.insert_row(&bundle3).unwrap();
let row3 = test_row!(ent_path @ [] => 4; [build_some_instances_from(25..29), build_some_point2d(4)]);
store.insert_row(&row3).unwrap();
}

for ent_path in &ent_paths {
let bundle4_1 = test_bundle!(ent_path @ [
build_frame_nr(4.into()), build_log_time(Time::now()),
] => [build_some_instances_from(20..23), build_some_rects(3)]);
store.insert_row(&bundle4_1).unwrap();
let row4_1 = test_row!(ent_path @ [
build_frame_nr(4.into()), build_log_time(Time::now()),
] => 3; [build_some_instances_from(20..23), build_some_rects(3)]);
store.insert_row(&row4_1).unwrap();

let bundle4_15 = test_bundle!(ent_path @ [
build_frame_nr(4.into()),
] => [build_some_instances_from(20..23), build_some_point2d(3)]);
store.insert_row(&bundle4_15).unwrap();
let row4_15 = test_row!(ent_path @ [
build_frame_nr(4.into()),
] => 3; [build_some_instances_from(20..23), build_some_point2d(3)]);
store.insert_row(&row4_15).unwrap();

let bundle4_2 = test_bundle!(ent_path @ [
build_frame_nr(4.into()), build_log_time(Time::now()),
] => [build_some_instances_from(25..28), build_some_rects(3)]);
store.insert_row(&bundle4_2).unwrap();
let row4_2 = test_row!(ent_path @ [
build_frame_nr(4.into()), build_log_time(Time::now()),
] => 3; [build_some_instances_from(25..28), build_some_rects(3)]);
store.insert_row(&row4_2).unwrap();

let bundle4_25 = test_bundle!(ent_path @ [
build_frame_nr(4.into()), build_log_time(Time::now()),
] => [build_some_instances_from(25..28), build_some_point2d(3)]);
store.insert_row(&bundle4_25).unwrap();
let row4_25 = test_row!(ent_path @ [
build_frame_nr(4.into()), build_log_time(Time::now()),
] => 3; [build_some_instances_from(25..28), build_some_point2d(3)]);
store.insert_row(&row4_25).unwrap();
}

let df = store.to_dataframe();
Expand Down
10 changes: 5 additions & 5 deletions crates/re_arrow_store/examples/latest_component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! ```
use re_arrow_store::polars_util::latest_component;
use re_arrow_store::{test_bundle, DataStore, LatestAtQuery, TimeType, Timeline};
use re_arrow_store::{test_row, DataStore, LatestAtQuery, TimeType, Timeline};
use re_log_types::component_types::Rect2D;
use re_log_types::datagen::build_some_rects;
use re_log_types::{
Expand All @@ -19,11 +19,11 @@ fn main() {

let ent_path = EntityPath::from("my/entity");

let bundle = test_bundle!(ent_path @ [build_frame_nr(2.into())] => [build_some_rects(4)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(2.into())] => 4; [build_some_rects(4)]);
store.insert_row(&row).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(3.into())] => [build_some_point2d(2)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(3.into())] => 2; [build_some_point2d(2)]);
store.insert_row(&row).unwrap();

let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);

Expand Down
10 changes: 5 additions & 5 deletions crates/re_arrow_store/examples/latest_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use polars_core::prelude::*;
use re_arrow_store::polars_util::latest_components;
use re_arrow_store::{test_bundle, DataStore, LatestAtQuery, TimeType, Timeline};
use re_arrow_store::{test_row, DataStore, LatestAtQuery, TimeType, Timeline};
use re_log_types::{
component_types::{InstanceKey, Point2D, Rect2D},
datagen::{build_frame_nr, build_some_point2d, build_some_rects},
Expand All @@ -18,11 +18,11 @@ fn main() {

let ent_path = EntityPath::from("my/entity");

let bundle = test_bundle!(ent_path @ [build_frame_nr(2.into())] => [build_some_rects(4)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(2.into())] => 4; [build_some_rects(4)]);
store.insert_row(&row).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(3.into())] => [build_some_point2d(2)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(3.into())] => 2; [build_some_point2d(2)]);
store.insert_row(&row).unwrap();

let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let df = latest_components(
Expand Down
30 changes: 15 additions & 15 deletions crates/re_arrow_store/examples/range_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! ```
use polars_core::prelude::JoinType;
use re_arrow_store::{polars_util, test_bundle, DataStore, RangeQuery, TimeRange};
use re_arrow_store::{polars_util, test_row, DataStore, RangeQuery, TimeRange};
use re_log_types::{
component_types::{InstanceKey, Point2D, Rect2D},
datagen::{build_frame_nr, build_some_point2d, build_some_rects},
Expand All @@ -22,26 +22,26 @@ fn main() {
let frame3 = 3.into();
let frame4 = 4.into();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame1)] => [build_some_rects(2)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(frame1)] => 2; [build_some_rects(2)]);
store.insert_row(&row).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame2)] => [build_some_point2d(2)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(frame2)] => 2; [build_some_point2d(2)]);
store.insert_row(&row).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame3)] => [build_some_point2d(4)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(frame3)] => 4; [build_some_point2d(4)]);
store.insert_row(&row).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_rects(3)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(frame4)] => 3; [build_some_rects(3)]);
store.insert_row(&row).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_point2d(1)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(frame4)] => 1; [build_some_point2d(1)]);
store.insert_row(&row).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_rects(3)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(frame4)] => 3; [build_some_rects(3)]);
store.insert_row(&row).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_point2d(3)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(frame4)] => 3; [build_some_point2d(3)]);
store.insert_row(&row).unwrap();

let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let query = RangeQuery::new(timeline_frame_nr, TimeRange::new(2.into(), 4.into()));
Expand Down
2 changes: 2 additions & 0 deletions crates/re_arrow_store/src/store_polars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use crate::{
PersistentIndexTable, RowIndex,
};

// TODO(#1692): all of this stuff should be defined by Data{Cell,Row,Table}, not the store.

// ---

impl DataStore {
Expand Down
Loading

2 comments on commit 8a37e50

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust Benchmark

Benchmark suite Current: 8a37e50 Previous: 2a62c32 Ratio
datastore/insert/batch/rects/insert 607999 ns/iter (± 3501) 572984 ns/iter (± 7501) 1.06
datastore/latest_at/batch/rects/query 1847 ns/iter (± 7) 1863 ns/iter (± 20) 0.99
datastore/latest_at/missing_components/primary 289 ns/iter (± 5) 291 ns/iter (± 1) 0.99
datastore/latest_at/missing_components/secondaries 444 ns/iter (± 2) 441 ns/iter (± 4) 1.01
datastore/range/batch/rects/query 152928 ns/iter (± 776) 149108 ns/iter (± 2287) 1.03
mono_points_arrow/generate_message_bundles 43674525 ns/iter (± 2094613) 25169001 ns/iter (± 1805149) 1.74
mono_points_arrow/generate_messages 122673947 ns/iter (± 1466966) 117284996 ns/iter (± 3697177) 1.05
mono_points_arrow/encode_log_msg 153248336 ns/iter (± 1169571) 154301975 ns/iter (± 4092727) 0.99
mono_points_arrow/encode_total 322940902 ns/iter (± 2003700) 303138919 ns/iter (± 4530539) 1.07
mono_points_arrow/decode_log_msg 176465542 ns/iter (± 1591891) 173788667 ns/iter (± 3025754) 1.02
mono_points_arrow/decode_message_bundles 53000928 ns/iter (± 931076) 53688235 ns/iter (± 2016588) 0.99
mono_points_arrow/decode_total 227298270 ns/iter (± 1971502) 227657720 ns/iter (± 2840472) 1.00
batch_points_arrow/generate_message_bundles 285537 ns/iter (± 2423) 320827 ns/iter (± 1764) 0.89
batch_points_arrow/generate_messages 6018 ns/iter (± 80) 5890 ns/iter (± 75) 1.02
batch_points_arrow/encode_log_msg 376585 ns/iter (± 2817) 345685 ns/iter (± 6489) 1.09
batch_points_arrow/encode_total 690810 ns/iter (± 3373) 686185 ns/iter (± 10500) 1.01
batch_points_arrow/decode_log_msg 350176 ns/iter (± 1466) 344619 ns/iter (± 2619) 1.02
batch_points_arrow/decode_message_bundles 1613 ns/iter (± 33) 1549 ns/iter (± 28) 1.04
batch_points_arrow/decode_total 358649 ns/iter (± 2838) 342702 ns/iter (± 3139) 1.05
arrow_mono_points/insert 6120830809 ns/iter (± 19096063) 6213809978 ns/iter (± 32979882) 0.99
arrow_mono_points/query 1721371 ns/iter (± 18940) 1764029 ns/iter (± 38395) 0.98
arrow_batch_points/insert 3013481 ns/iter (± 30029) 2999923 ns/iter (± 55625) 1.00
arrow_batch_points/query 15257 ns/iter (± 209) 16318 ns/iter (± 254) 0.93
arrow_batch_vecs/insert 43113 ns/iter (± 440) 44353 ns/iter (± 584) 0.97
arrow_batch_vecs/query 469903 ns/iter (± 5425) 384226 ns/iter (± 5361) 1.22
tuid/Tuid::random 34 ns/iter (± 0) 34 ns/iter (± 0) 1

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'Rust Benchmark'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 1.50.

Benchmark suite Current: 8a37e50 Previous: 2a62c32 Ratio
mono_points_arrow/generate_message_bundles 43674525 ns/iter (± 2094613) 25169001 ns/iter (± 1805149) 1.74

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.