Skip to content

Commit

Permalink
end-to-end batching 4: retire MsgBundle + batching support in trans…
Browse files Browse the repository at this point in the history
…port layer (#1679)

* Batching support at the transport layer

* fmt

* woop

* self reviewW

* doctest whining

* oh cmon

* addressing PR comments

* 0-indexed rows in examples
  • Loading branch information
teh-cmc authored Mar 29, 2023
1 parent 1c12938 commit d3b459f
Show file tree
Hide file tree
Showing 23 changed files with 992 additions and 785 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 23 additions & 29 deletions crates/re_data_store/examples/memory_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn live_bytes() -> usize {

// ----------------------------------------------------------------------------

use re_log_types::{entity_path, DataRow, DataTable, MsgId};
use re_log_types::{entity_path, DataRow, MsgId};

fn main() {
log_messages();
Expand Down Expand Up @@ -105,23 +105,20 @@ fn log_messages() {

{
let used_bytes_start = live_bytes();
let msg_bundle = Box::new(
DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
1,
build_some_point2d(1),
)],
let table = Box::new(
DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
1,
build_some_point2d(1),
)
.into_msg_bundle(),
.into_table(),
);
let msg_bundle_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*msg_bundle).unwrap()));
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(&*table).unwrap()));
let log_msg_bytes = live_bytes() - used_bytes_start;
println!("Arrow MsgBundle containing a Pos2 uses {msg_bundle_bytes} bytes in RAM");
println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM");
let encoded = encode_log_msg(&log_msg);
println!(
"Arrow LogMsg containing a Pos2 uses {}-{log_msg_bytes} bytes in RAM, and {} bytes encoded",
Expand All @@ -131,23 +128,20 @@ fn log_messages() {

{
let used_bytes_start = live_bytes();
let msg_bundle = Box::new(
DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
NUM_POINTS as _,
build_some_point2d(NUM_POINTS),
)],
let table = Box::new(
DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
NUM_POINTS as _,
build_some_point2d(NUM_POINTS),
)
.into_msg_bundle(),
.into_table(),
);
let msg_bundle_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*msg_bundle).unwrap()));
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(&*table).unwrap()));
let log_msg_bytes = live_bytes() - used_bytes_start;
println!("Arrow MsgBundle containing a Pos2 uses {msg_bundle_bytes} bytes in RAM");
println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM");
let encoded = encode_log_msg(&log_msg);
println!(
"Arrow LogMsg containing {NUM_POINTS}x Pos2 uses {}-{log_msg_bytes} bytes in RAM, and {} bytes encoded",
Expand Down
7 changes: 3 additions & 4 deletions crates/re_data_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ pub use entity_tree::*;
pub use instance_path::*;
pub use log_db::LogDb;

use re_log_types::msg_bundle;

#[cfg(feature = "serde")]
pub use editable_auto_value::EditableAutoValue;
use re_log_types::DataTableError;
pub use re_log_types::{ComponentName, EntityPath, EntityPathPart, Index, TimeInt, Timeline};

// ----------------------------------------------------------------------------
Expand All @@ -30,8 +29,8 @@ pub use re_log_types::{ComponentName, EntityPath, EntityPathPart, Index, TimeInt
/// or how the logging SDK is being used (PEBKAC).
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error(transparent)]
MsgBundleError(#[from] msg_bundle::MsgBundleError),
#[error("Error with one the underlying data table")]
DataTable(#[from] DataTableError),

#[error(transparent)]
WriteError(#[from] re_arrow_store::WriteError),
Expand Down
25 changes: 14 additions & 11 deletions crates/re_data_store/src/log_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use nohash_hasher::IntMap;
use re_arrow_store::{DataStoreConfig, GarbageCollectionTarget, TimeInt};
use re_log_types::{
component_types::InstanceKey,
external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, msg_bundle::MsgBundle,
ArrowMsg, BeginRecordingMsg, Component as _, ComponentPath, DataCell, DataRow, DataTable,
EntityPath, EntityPathHash, EntityPathOpMsg, LogMsg, MsgId, PathOp, RecordingId, RecordingInfo,
TimePoint, Timeline,
external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, ArrowMsg,
BeginRecordingMsg, Component as _, ComponentPath, DataCell, DataRow, DataTable, EntityPath,
EntityPathHash, EntityPathOpMsg, LogMsg, MsgId, PathOp, RecordingId, RecordingInfo, TimePoint,
Timeline,
};

use crate::{Error, TimesPerTimeline};
Expand Down Expand Up @@ -76,9 +76,8 @@ impl EntityDb {
.or_insert_with(|| entity_path.clone());
}

fn try_add_arrow_data_msg(&mut self, msg: &ArrowMsg) -> Result<(), Error> {
let msg_bundle = MsgBundle::try_from(msg).map_err(Error::MsgBundleError)?;
let table = DataTable::from_msg_bundle(msg_bundle);
fn try_add_arrow_msg(&mut self, msg: &ArrowMsg) -> Result<(), Error> {
let table: DataTable = msg.try_into()?;

// TODO(#1619): batch all of this
for row in table.as_rows() {
Expand All @@ -95,7 +94,7 @@ impl EntityDb {

self.register_entity_path(&row.entity_path);

for cell in row.cells() {
for cell in row.cells().iter() {
let component_path =
ComponentPath::new(row.entity_path().clone(), cell.component_name());
if cell.component_name() == MsgId::name() {
Expand Down Expand Up @@ -233,6 +232,7 @@ impl LogDb {

pub fn add(&mut self, msg: LogMsg) -> Result<(), Error> {
crate::profile_function!();

match &msg {
LogMsg::BeginRecordingMsg(msg) => self.add_begin_recording_msg(msg),
LogMsg::EntityPathOpMsg(msg) => {
Expand All @@ -243,13 +243,16 @@ impl LogDb {
} = msg;
self.entity_db.add_path_op(*msg_id, time_point, path_op);
}
LogMsg::ArrowMsg(msg) => {
self.entity_db.try_add_arrow_data_msg(msg)?;
}
LogMsg::ArrowMsg(inner) => self.entity_db.try_add_arrow_msg(inner)?,
LogMsg::Goodbye(_) => {}
}

// TODO(#1619): the following only makes sense because, while we support sending and
// receiving batches, we don't actually do so yet.
// We need to stop storing raw `LogMsg`s before we can benefit from our batching.
self.chronological_message_ids.push(msg.id());
self.log_messages.insert(msg.id(), msg);

Ok(())
}

Expand Down
7 changes: 6 additions & 1 deletion crates/re_log_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ re_tuid.workspace = true
# External
ahash.workspace = true
array-init = "2.1.0"
arrow2 = { workspace = true, features = ["io_ipc", "io_print"] }
arrow2 = { workspace = true, features = [
"io_ipc",
"io_print",
"compute_concatenate",
] }
arrow2_convert.workspace = true
bytemuck = "1.11"
document-features = "0.2"
Expand All @@ -72,6 +76,7 @@ ndarray.workspace = true
nohash-hasher = "0.2"
num-derive = "0.3"
num-traits = "0.2"
smallvec = "1.10"
thiserror.workspace = true
time = { workspace = true, default-features = false, features = [
"formatting",
Expand Down
Loading

2 comments on commit d3b459f

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust Benchmark

Benchmark suite Current: d3b459f Previous: 1c12938 Ratio
datastore/insert/batch/rects/insert 613360 ns/iter (± 2450) 593911 ns/iter (± 5519) 1.03
datastore/latest_at/batch/rects/query 1854 ns/iter (± 12) 1792 ns/iter (± 30) 1.03
datastore/latest_at/missing_components/primary 285 ns/iter (± 1) 274 ns/iter (± 6) 1.04
datastore/latest_at/missing_components/secondaries 437 ns/iter (± 3) 421 ns/iter (± 6) 1.04
datastore/range/batch/rects/query 153084 ns/iter (± 1736) 144420 ns/iter (± 2553) 1.06
mono_points_arrow/generate_message_bundles 45095447 ns/iter (± 815265) 38988125 ns/iter (± 1978229) 1.16
mono_points_arrow/generate_messages 183869457 ns/iter (± 1497684) 126675529 ns/iter (± 1989102) 1.45
mono_points_arrow/encode_log_msg 222487012 ns/iter (± 1418038) 156272521 ns/iter (± 3573050) 1.42
mono_points_arrow/encode_total 449525904 ns/iter (± 2751440) 329849052 ns/iter (± 3822145) 1.36
mono_points_arrow/decode_log_msg 265744193 ns/iter (± 1871293) 178154001 ns/iter (± 2223215) 1.49
mono_points_arrow/decode_message_bundles 99878259 ns/iter (± 874456) 53719099 ns/iter (± 1587126) 1.86
mono_points_arrow/decode_total 364404996 ns/iter (± 2309100) 232970399 ns/iter (± 3184196) 1.56
mono_points_arrow_batched/generate_message_bundles 35268140 ns/iter (± 2123546)
mono_points_arrow_batched/generate_messages 9356328 ns/iter (± 812413)
mono_points_arrow_batched/encode_log_msg 1808729 ns/iter (± 8582)
mono_points_arrow_batched/encode_total 49341341 ns/iter (± 1943977)
mono_points_arrow_batched/decode_log_msg 991089 ns/iter (± 4752)
mono_points_arrow_batched/decode_message_bundles 17540706 ns/iter (± 1258358)
mono_points_arrow_batched/decode_total 20186464 ns/iter (± 1180501)
batch_points_arrow/generate_message_bundles 279031 ns/iter (± 2259) 267185 ns/iter (± 3173) 1.04
batch_points_arrow/generate_messages 7659 ns/iter (± 96) 5806 ns/iter (± 63) 1.32
batch_points_arrow/encode_log_msg 391445 ns/iter (± 2031) 369900 ns/iter (± 3214) 1.06
batch_points_arrow/encode_total 697272 ns/iter (± 5100) 656456 ns/iter (± 7786) 1.06
batch_points_arrow/decode_log_msg 323104 ns/iter (± 1240) 343418 ns/iter (± 3860) 0.94
batch_points_arrow/decode_message_bundles 2859 ns/iter (± 27) 1485 ns/iter (± 22) 1.93
batch_points_arrow/decode_total 332868 ns/iter (± 2468) 351741 ns/iter (± 3020) 0.95
arrow_mono_points/insert 6918828701 ns/iter (± 18334367) 6991458492 ns/iter (± 29665211) 0.99
arrow_mono_points/query 1799250 ns/iter (± 16872) 1644339 ns/iter (± 17752) 1.09
arrow_batch_points/insert 3065257 ns/iter (± 19725) 2904465 ns/iter (± 26689) 1.06
arrow_batch_points/query 16274 ns/iter (± 34) 16990 ns/iter (± 230) 0.96
arrow_batch_vecs/insert 44550 ns/iter (± 633) 41970 ns/iter (± 374) 1.06
arrow_batch_vecs/query 531374 ns/iter (± 5794) 479655 ns/iter (± 5078) 1.11
tuid/Tuid::random 34 ns/iter (± 0) 33 ns/iter (± 0) 1.03

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'Rust Benchmark'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 1.50.

Benchmark suite Current: d3b459f Previous: 1c12938 Ratio
mono_points_arrow/decode_message_bundles 99878259 ns/iter (± 874456) 53719099 ns/iter (± 1587126) 1.86
mono_points_arrow/decode_total 364404996 ns/iter (± 2309100) 232970399 ns/iter (± 3184196) 1.56
batch_points_arrow/decode_message_bundles 2859 ns/iter (± 27) 1485 ns/iter (± 22) 1.93

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.