Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Mar 27, 2023
1 parent dc1c47f commit eefab45
Show file tree
Hide file tree
Showing 23 changed files with 901 additions and 755 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 26 additions & 32 deletions crates/re_data_store/examples/memory_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,23 +105,20 @@ fn log_messages() {

{
let used_bytes_start = live_bytes();
let msg_bundle = Box::new(
DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
1,
build_some_point2d(1),
)],
)
.into_msg_bundle(),
);
let msg_bundle_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*msg_bundle).unwrap()));
let table = Box::new(DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
1,
build_some_point2d(1),
)],
));
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(&*table).unwrap()));
let log_msg_bytes = live_bytes() - used_bytes_start;
println!("Arrow MsgBundle containing a Pos2 uses {msg_bundle_bytes} bytes in RAM");
println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM");
let encoded = encode_log_msg(&log_msg);
println!(
"Arrow LogMsg containing a Pos2 uses {}-{log_msg_bytes} bytes in RAM, and {} bytes encoded",
Expand All @@ -131,23 +128,20 @@ fn log_messages() {

{
let used_bytes_start = live_bytes();
let msg_bundle = Box::new(
DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
NUM_POINTS as _,
build_some_point2d(NUM_POINTS),
)],
)
.into_msg_bundle(),
);
let msg_bundle_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*msg_bundle).unwrap()));
let table = Box::new(DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
NUM_POINTS as _,
build_some_point2d(NUM_POINTS),
)],
));
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(&*table).unwrap()));
let log_msg_bytes = live_bytes() - used_bytes_start;
println!("Arrow MsgBundle containing a Pos2 uses {msg_bundle_bytes} bytes in RAM");
println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM");
let encoded = encode_log_msg(&log_msg);
println!(
"Arrow LogMsg containing {NUM_POINTS}x Pos2 uses {}-{log_msg_bytes} bytes in RAM, and {} bytes encoded",
Expand Down
7 changes: 3 additions & 4 deletions crates/re_data_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ pub use entity_tree::*;
pub use instance_path::*;
pub use log_db::LogDb;

use re_log_types::msg_bundle;

#[cfg(feature = "serde")]
pub use editable_auto_value::EditableAutoValue;
use re_log_types::DataTableError;
pub use re_log_types::{ComponentName, EntityPath, EntityPathPart, Index, TimeInt, Timeline};

// ----------------------------------------------------------------------------
Expand All @@ -30,8 +29,8 @@ pub use re_log_types::{ComponentName, EntityPath, EntityPathPart, Index, TimeInt
/// or how the logging SDK is being used (PEBKAC).
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error(transparent)]
MsgBundleError(#[from] msg_bundle::MsgBundleError),
#[error("Error with one the underlying data table")]
DataTable(#[from] DataTableError),

#[error(transparent)]
WriteError(#[from] re_arrow_store::WriteError),
Expand Down
24 changes: 13 additions & 11 deletions crates/re_data_store/src/log_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use nohash_hasher::IntMap;
use re_arrow_store::{DataStoreConfig, GarbageCollectionTarget, TimeInt};
use re_log_types::{
component_types::InstanceKey,
external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, msg_bundle::MsgBundle,
ArrowMsg, BeginRecordingMsg, Component as _, ComponentPath, DataCell, DataRow, DataTable,
EntityPath, EntityPathHash, EntityPathOpMsg, LogMsg, MsgId, PathOp, RecordingId, RecordingInfo,
TimePoint, Timeline,
external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, ArrowMsg,
BeginRecordingMsg, Component as _, ComponentPath, DataCell, DataRow, DataTable, EntityPath,
EntityPathHash, EntityPathOpMsg, LogMsg, MsgId, PathOp, RecordingId, RecordingInfo, TimePoint,
Timeline,
};

use crate::{Error, TimesPerTimeline};
Expand Down Expand Up @@ -76,9 +76,8 @@ impl EntityDb {
.or_insert_with(|| entity_path.clone());
}

fn try_add_arrow_data_msg(&mut self, msg: &ArrowMsg) -> Result<(), Error> {
let msg_bundle = MsgBundle::try_from(msg).map_err(Error::MsgBundleError)?;
let table = DataTable::from_msg_bundle(msg_bundle);
fn try_add_arrow_msg(&mut self, msg: &ArrowMsg) -> Result<(), Error> {
let table: DataTable = msg.try_into()?;

// TODO(#1619): batch all of this
for row in table.as_rows() {
Expand All @@ -95,7 +94,7 @@ impl EntityDb {

self.register_entity_path(&row.entity_path);

for cell in row.cells() {
for cell in row.cells().iter() {
let component_path =
ComponentPath::new(row.entity_path().clone(), cell.component_name());
if cell.component_name() == MsgId::name() {
Expand Down Expand Up @@ -231,8 +230,10 @@ impl LogDb {
self.log_messages.is_empty()
}

// TODO(#1619): stop storing raw LogMsgs.
pub fn add(&mut self, msg: LogMsg) -> Result<(), Error> {
crate::profile_function!();

match &msg {
LogMsg::BeginRecordingMsg(msg) => self.add_begin_recording_msg(msg),
LogMsg::EntityPathOpMsg(msg) => {
Expand All @@ -243,13 +244,14 @@ impl LogDb {
} = msg;
self.entity_db.add_path_op(*msg_id, time_point, path_op);
}
LogMsg::ArrowMsg(msg) => {
self.entity_db.try_add_arrow_data_msg(msg)?;
}
LogMsg::ArrowMsg(inner) => self.entity_db.try_add_arrow_msg(inner)?,
LogMsg::Goodbye(_) => {}
}

// TODO(cmc): The following simply doesn't make sense
self.chronological_message_ids.push(msg.id());
self.log_messages.insert(msg.id(), msg);

Ok(())
}

Expand Down
3 changes: 2 additions & 1 deletion crates/re_log_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ re_tuid.workspace = true
# External
ahash.workspace = true
array-init = "2.1.0"
arrow2 = { workspace = true, features = ["io_ipc", "io_print"] }
arrow2 = { workspace = true, features = ["io_ipc", "io_print", "compute_concatenate"] }
arrow2_convert.workspace = true
bytemuck = "1.11"
document-features = "0.2"
Expand All @@ -72,6 +72,7 @@ ndarray.workspace = true
nohash-hasher = "0.2"
num-derive = "0.3"
num-traits = "0.2"
smallvec = "1.10"
thiserror.workspace = true
time = { workspace = true, default-features = false, features = [
"formatting",
Expand Down
34 changes: 15 additions & 19 deletions crates/re_log_types/benches/msg_encode_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

use re_log_types::{
datagen::{build_frame_nr, build_some_colors, build_some_point2d},
entity_path,
msg_bundle::MsgBundle,
ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId,
entity_path, ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId,
};

use criterion::{criterion_group, criterion_main, Criterion};
Expand Down Expand Up @@ -39,19 +37,19 @@ fn decode_log_msgs(mut bytes: &[u8]) -> Vec<LogMsg> {
messages
}

fn generate_messages(bundles: &[MsgBundle]) -> Vec<LogMsg> {
bundles
fn generate_messages(tables: &[DataTable]) -> Vec<LogMsg> {
tables
.iter()
.map(|bundle| LogMsg::ArrowMsg(ArrowMsg::try_from(bundle.clone()).unwrap()))
.map(|table| LogMsg::ArrowMsg(ArrowMsg::try_from(table).unwrap()))
.collect()
}

fn decode_message_bundles(messages: &[LogMsg]) -> Vec<MsgBundle> {
fn decode_message_bundles(messages: &[LogMsg]) -> Vec<DataTable> {
messages
.iter()
.map(|log_msg| {
if let LogMsg::ArrowMsg(arrow_msg) = log_msg {
MsgBundle::try_from(arrow_msg).unwrap()
DataTable::try_from(arrow_msg).unwrap()
} else {
unreachable!()
}
Expand All @@ -60,7 +58,7 @@ fn decode_message_bundles(messages: &[LogMsg]) -> Vec<MsgBundle> {
}

fn mono_points_arrow(c: &mut Criterion) {
fn generate_message_bundles() -> Vec<MsgBundle> {
fn generate_tables() -> Vec<DataTable> {
(0..NUM_POINTS)
.map(|i| {
DataTable::from_rows(
Expand All @@ -73,7 +71,6 @@ fn mono_points_arrow(c: &mut Criterion) {
(build_some_point2d(1), build_some_colors(1)),
)],
)
.into_msg_bundle()
})
.collect()
}
Expand All @@ -82,9 +79,9 @@ fn mono_points_arrow(c: &mut Criterion) {
let mut group = c.benchmark_group("mono_points_arrow");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
b.iter(generate_message_bundles);
b.iter(generate_tables);
});
let bundles = generate_message_bundles();
let bundles = generate_tables();
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(&bundles));
});
Expand All @@ -93,7 +90,7 @@ fn mono_points_arrow(c: &mut Criterion) {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(&generate_message_bundles())));
b.iter(|| encode_log_msgs(&generate_messages(&generate_tables())));
});

let encoded = encode_log_msgs(&messages);
Expand All @@ -118,7 +115,7 @@ fn mono_points_arrow(c: &mut Criterion) {
}

fn batch_points_arrow(c: &mut Criterion) {
fn generate_message_bundles() -> Vec<MsgBundle> {
fn generate_tables() -> Vec<DataTable> {
vec![DataTable::from_rows(
MsgId::ZERO,
[DataRow::from_cells2(
Expand All @@ -131,17 +128,16 @@ fn batch_points_arrow(c: &mut Criterion) {
build_some_colors(NUM_POINTS),
),
)],
)
.into_msg_bundle()]
)]
}

{
let mut group = c.benchmark_group("batch_points_arrow");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
b.iter(generate_message_bundles);
b.iter(generate_tables);
});
let bundles = generate_message_bundles();
let bundles = generate_tables();
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(&bundles));
});
Expand All @@ -150,7 +146,7 @@ fn batch_points_arrow(c: &mut Criterion) {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(&generate_message_bundles())));
b.iter(|| encode_log_msgs(&generate_messages(&generate_tables())));
});

let encoded = encode_log_msgs(&messages);
Expand Down
Loading

0 comments on commit eefab45

Please sign in to comment.