Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Mar 24, 2023
1 parent d33cd0f commit 89c0780
Show file tree
Hide file tree
Showing 22 changed files with 842 additions and 719 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 26 additions & 32 deletions crates/re_data_store/examples/memory_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,23 +105,20 @@ fn log_messages() {

{
let used_bytes_start = live_bytes();
let msg_bundle = Box::new(
DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
1,
build_some_point2d(1),
)],
)
.into_msg_bundle(),
);
let msg_bundle_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*msg_bundle).unwrap()));
let table = Box::new(DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
1,
build_some_point2d(1),
)],
));
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*table).unwrap()));
let log_msg_bytes = live_bytes() - used_bytes_start;
println!("Arrow MsgBundle containing a Pos2 uses {msg_bundle_bytes} bytes in RAM");
println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM");
let encoded = encode_log_msg(&log_msg);
println!(
"Arrow LogMsg containing a Pos2 uses {}-{log_msg_bytes} bytes in RAM, and {} bytes encoded",
Expand All @@ -131,23 +128,20 @@ fn log_messages() {

{
let used_bytes_start = live_bytes();
let msg_bundle = Box::new(
DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
NUM_POINTS as _,
build_some_point2d(NUM_POINTS),
)],
)
.into_msg_bundle(),
);
let msg_bundle_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*msg_bundle).unwrap()));
let table = Box::new(DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
NUM_POINTS as _,
build_some_point2d(NUM_POINTS),
)],
));
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*table).unwrap()));
let log_msg_bytes = live_bytes() - used_bytes_start;
println!("Arrow MsgBundle containing a Pos2 uses {msg_bundle_bytes} bytes in RAM");
println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM");
let encoded = encode_log_msg(&log_msg);
println!(
"Arrow LogMsg containing {NUM_POINTS}x Pos2 uses {}-{log_msg_bytes} bytes in RAM, and {} bytes encoded",
Expand Down
5 changes: 0 additions & 5 deletions crates/re_data_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ pub use entity_tree::*;
pub use instance_path::*;
pub use log_db::LogDb;

use re_log_types::msg_bundle;

#[cfg(feature = "serde")]
pub use editable_auto_value::EditableAutoValue;
pub use re_log_types::{ComponentName, EntityPath, EntityPathPart, Index, TimeInt, Timeline};
Expand All @@ -30,9 +28,6 @@ pub use re_log_types::{ComponentName, EntityPath, EntityPathPart, Index, TimeInt
/// or how the logging SDK is being used (PEBKAC).
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error(transparent)]
MsgBundleError(#[from] msg_bundle::MsgBundleError),

#[error(transparent)]
WriteError(#[from] re_arrow_store::WriteError),
}
Expand Down
24 changes: 16 additions & 8 deletions crates/re_data_store/src/log_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use nohash_hasher::IntMap;
use re_arrow_store::{DataStoreConfig, GarbageCollectionTarget, TimeInt};
use re_log_types::{
component_types::InstanceKey,
external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, msg_bundle::MsgBundle,
ArrowMsg, BeginRecordingMsg, Component as _, ComponentPath, DataCell, DataRow, DataTable,
EntityPath, EntityPathHash, EntityPathOpMsg, LogMsg, MsgId, PathOp, RecordingId, RecordingInfo,
TimePoint, Timeline,
external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, ArrowMsg,
BeginRecordingMsg, Component as _, ComponentPath, DataCell, DataRow, DataTable, EntityPath,
EntityPathHash, EntityPathOpMsg, LogMsg, MsgId, PathOp, RecordingId, RecordingInfo, TimePoint,
Timeline,
};

use crate::{Error, TimesPerTimeline};
Expand Down Expand Up @@ -77,8 +77,7 @@ impl EntityDb {
}

fn try_add_arrow_data_msg(&mut self, msg: &ArrowMsg) -> Result<(), Error> {
let msg_bundle = MsgBundle::try_from(msg).map_err(Error::MsgBundleError)?;
let table = DataTable::from_msg_bundle(msg_bundle);
let table: DataTable = msg.try_into().unwrap(); // TODO

// TODO(#1619): batch all of this
for row in table.as_rows() {
Expand Down Expand Up @@ -229,8 +228,10 @@ impl LogDb {
self.log_messages.is_empty()
}

// TODO: drop LogMsg storage
pub fn add(&mut self, msg: LogMsg) -> Result<(), Error> {
crate::profile_function!();

match &msg {
LogMsg::BeginRecordingMsg(msg) => self.add_begin_recording_msg(msg),
LogMsg::EntityPathOpMsg(msg) => {
Expand All @@ -241,13 +242,20 @@ impl LogDb {
} = msg;
self.entity_db.add_path_op(*msg_id, time_point, path_op);
}
LogMsg::ArrowMsg(msg) => {
self.entity_db.try_add_arrow_data_msg(msg)?;
LogMsg::ArrowMsg(inner) => {
self.entity_db.try_add_arrow_data_msg(inner)?;
// TODO: oh boy, that thing again
self.chronological_message_ids.extend(msg.ids());
self.log_messages
.extend(msg.ids().into_iter().map(|msg_id| (msg_id, msg.clone())));
return Ok(());
}
LogMsg::Goodbye(_) => {}
}

self.chronological_message_ids.push(msg.id());
self.log_messages.insert(msg.id(), msg);

Ok(())
}

Expand Down
3 changes: 2 additions & 1 deletion crates/re_log_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ re_tuid.workspace = true
# External
ahash.workspace = true
array-init = "2.1.0"
arrow2 = { workspace = true, features = ["io_ipc", "io_print"] }
arrow2 = { workspace = true, features = ["io_ipc", "io_print", "compute_concatenate"] }
arrow2_convert.workspace = true
bytemuck = "1.11"
document-features = "0.2"
Expand All @@ -72,6 +72,7 @@ ndarray.workspace = true
nohash-hasher = "0.2"
num-derive = "0.3"
num-traits = "0.2"
smallvec = "1.10"
thiserror.workspace = true
time = { workspace = true, default-features = false, features = [
"formatting",
Expand Down
34 changes: 15 additions & 19 deletions crates/re_log_types/benches/msg_encode_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

use re_log_types::{
datagen::{build_frame_nr, build_some_colors, build_some_point2d},
entity_path,
msg_bundle::MsgBundle,
ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId,
entity_path, ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId,
};

use criterion::{criterion_group, criterion_main, Criterion};
Expand Down Expand Up @@ -39,19 +37,19 @@ fn decode_log_msgs(mut bytes: &[u8]) -> Vec<LogMsg> {
messages
}

fn generate_messages(bundles: &[MsgBundle]) -> Vec<LogMsg> {
bundles
fn generate_messages(tables: &[DataTable]) -> Vec<LogMsg> {
tables
.iter()
.map(|bundle| LogMsg::ArrowMsg(ArrowMsg::try_from(bundle.clone()).unwrap()))
.map(|table| LogMsg::ArrowMsg(ArrowMsg::try_from(table.clone()).unwrap()))
.collect()
}

fn decode_message_bundles(messages: &[LogMsg]) -> Vec<MsgBundle> {
fn decode_message_bundles(messages: &[LogMsg]) -> Vec<DataTable> {
messages
.iter()
.map(|log_msg| {
if let LogMsg::ArrowMsg(arrow_msg) = log_msg {
MsgBundle::try_from(arrow_msg).unwrap()
DataTable::try_from(arrow_msg).unwrap()
} else {
unreachable!()
}
Expand All @@ -60,7 +58,7 @@ fn decode_message_bundles(messages: &[LogMsg]) -> Vec<MsgBundle> {
}

fn mono_points_arrow(c: &mut Criterion) {
fn generate_message_bundles() -> Vec<MsgBundle> {
fn generate_tables() -> Vec<DataTable> {
(0..NUM_POINTS)
.map(|i| {
DataTable::from_rows(
Expand All @@ -73,7 +71,6 @@ fn mono_points_arrow(c: &mut Criterion) {
(build_some_point2d(1), build_some_colors(1)),
)],
)
.into_msg_bundle()
})
.collect()
}
Expand All @@ -82,9 +79,9 @@ fn mono_points_arrow(c: &mut Criterion) {
let mut group = c.benchmark_group("mono_points_arrow");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
b.iter(generate_message_bundles);
b.iter(generate_tables);
});
let bundles = generate_message_bundles();
let bundles = generate_tables();
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(&bundles));
});
Expand All @@ -93,7 +90,7 @@ fn mono_points_arrow(c: &mut Criterion) {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(&generate_message_bundles())));
b.iter(|| encode_log_msgs(&generate_messages(&generate_tables())));
});

let encoded = encode_log_msgs(&messages);
Expand All @@ -118,7 +115,7 @@ fn mono_points_arrow(c: &mut Criterion) {
}

fn batch_points_arrow(c: &mut Criterion) {
fn generate_message_bundles() -> Vec<MsgBundle> {
fn generate_tables() -> Vec<DataTable> {
vec![DataTable::from_rows(
MsgId::ZERO,
[DataRow::from_cells2(
Expand All @@ -131,17 +128,16 @@ fn batch_points_arrow(c: &mut Criterion) {
build_some_colors(NUM_POINTS),
),
)],
)
.into_msg_bundle()]
)]
}

{
let mut group = c.benchmark_group("batch_points_arrow");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
b.iter(generate_message_bundles);
b.iter(generate_tables);
});
let bundles = generate_message_bundles();
let bundles = generate_tables();
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(&bundles));
});
Expand All @@ -150,7 +146,7 @@ fn batch_points_arrow(c: &mut Criterion) {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(&generate_message_bundles())));
b.iter(|| encode_log_msgs(&generate_messages(&generate_tables())));
});

let encoded = encode_log_msgs(&messages);
Expand Down
Loading

1 comment on commit 89c0780

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust Benchmark

Benchmark suite Current: 89c0780 Previous: 9f08910 Ratio
datastore/insert/batch/rects/insert 504650 ns/iter (± 1575) 535786 ns/iter (± 7633) 0.94
datastore/latest_at/batch/rects/query 1837 ns/iter (± 12) 1889 ns/iter (± 21) 0.97
datastore/latest_at/missing_components/primary 287 ns/iter (± 1) 286 ns/iter (± 2) 1.00
datastore/latest_at/missing_components/secondaries 438 ns/iter (± 2) 426 ns/iter (± 5) 1.03
datastore/range/batch/rects/query 153027 ns/iter (± 1043) 144856 ns/iter (± 2172) 1.06
mono_points_arrow/generate_message_bundles 25835867 ns/iter (± 768622) 45850980 ns/iter (± 1850685) 0.56
mono_points_arrow/generate_messages 145717783 ns/iter (± 1265280) 134893342 ns/iter (± 1598268) 1.08
mono_points_arrow/encode_log_msg 180275616 ns/iter (± 1149515) 162120553 ns/iter (± 1786434) 1.11
mono_points_arrow/encode_total 349287690 ns/iter (± 1422502) 346154791 ns/iter (± 3020593) 1.01
mono_points_arrow/decode_log_msg 211161589 ns/iter (± 861849) 179423833 ns/iter (± 2267808) 1.18
mono_points_arrow/decode_message_bundles 73279354 ns/iter (± 618683) 70610602 ns/iter (± 1360115) 1.04
mono_points_arrow/decode_total 282081351 ns/iter (± 1468152) 246521038 ns/iter (± 2980571) 1.14
batch_points_arrow/generate_message_bundles 186681 ns/iter (± 324) 341039 ns/iter (± 1517) 0.55
batch_points_arrow/generate_messages 7036 ns/iter (± 22) 6231 ns/iter (± 71) 1.13
batch_points_arrow/encode_log_msg 280339 ns/iter (± 936) 359122 ns/iter (± 2864) 0.78
batch_points_arrow/encode_total 503055 ns/iter (± 14094) 711662 ns/iter (± 9472) 0.71
batch_points_arrow/decode_log_msg 216599 ns/iter (± 681) 340495 ns/iter (± 3541) 0.64
batch_points_arrow/decode_message_bundles 2469 ns/iter (± 13) 1941 ns/iter (± 21) 1.27
batch_points_arrow/decode_total 225978 ns/iter (± 661) 349205 ns/iter (± 3286) 0.65
arrow_mono_points/insert 3701572071 ns/iter (± 16917001) 6929735477 ns/iter (± 28837413) 0.53
arrow_mono_points/query 1759655 ns/iter (± 11762) 1780043 ns/iter (± 27911) 0.99
arrow_batch_points/insert 1760821 ns/iter (± 6695) 2632132 ns/iter (± 24682) 0.67
arrow_batch_points/query 16804 ns/iter (± 149) 16196 ns/iter (± 91) 1.04
arrow_batch_vecs/insert 31593 ns/iter (± 133) 41452 ns/iter (± 406) 0.76
arrow_batch_vecs/query 451635 ns/iter (± 1253) 386691 ns/iter (± 4645) 1.17
tuid/Tuid::random 34 ns/iter (± 0) 34 ns/iter (± 0) 1

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.