Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Mar 27, 2023
1 parent dc1c47f commit 34c2db8
Show file tree
Hide file tree
Showing 23 changed files with 790 additions and 741 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 26 additions & 32 deletions crates/re_data_store/examples/memory_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,23 +105,20 @@ fn log_messages() {

{
let used_bytes_start = live_bytes();
let msg_bundle = Box::new(
DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
1,
build_some_point2d(1),
)],
)
.into_msg_bundle(),
);
let msg_bundle_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*msg_bundle).unwrap()));
let table = Box::new(DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
1,
build_some_point2d(1),
)],
));
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(&*table).unwrap()));
let log_msg_bytes = live_bytes() - used_bytes_start;
println!("Arrow MsgBundle containing a Pos2 uses {msg_bundle_bytes} bytes in RAM");
println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM");
let encoded = encode_log_msg(&log_msg);
println!(
"Arrow LogMsg containing a Pos2 uses {}-{log_msg_bytes} bytes in RAM, and {} bytes encoded",
Expand All @@ -131,23 +128,20 @@ fn log_messages() {

{
let used_bytes_start = live_bytes();
let msg_bundle = Box::new(
DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
NUM_POINTS as _,
build_some_point2d(NUM_POINTS),
)],
)
.into_msg_bundle(),
);
let msg_bundle_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*msg_bundle).unwrap()));
let table = Box::new(DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
NUM_POINTS as _,
build_some_point2d(NUM_POINTS),
)],
));
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(&*table).unwrap()));
let log_msg_bytes = live_bytes() - used_bytes_start;
println!("Arrow MsgBundle containing a Pos2 uses {msg_bundle_bytes} bytes in RAM");
println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM");
let encoded = encode_log_msg(&log_msg);
println!(
"Arrow LogMsg containing {NUM_POINTS}x Pos2 uses {}-{log_msg_bytes} bytes in RAM, and {} bytes encoded",
Expand Down
5 changes: 0 additions & 5 deletions crates/re_data_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ pub use entity_tree::*;
pub use instance_path::*;
pub use log_db::LogDb;

use re_log_types::msg_bundle;

#[cfg(feature = "serde")]
pub use editable_auto_value::EditableAutoValue;
pub use re_log_types::{ComponentName, EntityPath, EntityPathPart, Index, TimeInt, Timeline};
Expand All @@ -30,9 +28,6 @@ pub use re_log_types::{ComponentName, EntityPath, EntityPathPart, Index, TimeInt
/// or how the logging SDK is being used (PEBKAC).
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error(transparent)]
MsgBundleError(#[from] msg_bundle::MsgBundleError),

#[error(transparent)]
WriteError(#[from] re_arrow_store::WriteError),
}
Expand Down
24 changes: 16 additions & 8 deletions crates/re_data_store/src/log_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use nohash_hasher::IntMap;
use re_arrow_store::{DataStoreConfig, GarbageCollectionTarget, TimeInt};
use re_log_types::{
component_types::InstanceKey,
external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, msg_bundle::MsgBundle,
ArrowMsg, BeginRecordingMsg, Component as _, ComponentPath, DataCell, DataRow, DataTable,
EntityPath, EntityPathHash, EntityPathOpMsg, LogMsg, MsgId, PathOp, RecordingId, RecordingInfo,
TimePoint, Timeline,
external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, ArrowMsg,
BeginRecordingMsg, Component as _, ComponentPath, DataCell, DataRow, DataTable, EntityPath,
EntityPathHash, EntityPathOpMsg, LogMsg, MsgId, PathOp, RecordingId, RecordingInfo, TimePoint,
Timeline,
};

use crate::{Error, TimesPerTimeline};
Expand Down Expand Up @@ -77,8 +77,7 @@ impl EntityDb {
}

fn try_add_arrow_data_msg(&mut self, msg: &ArrowMsg) -> Result<(), Error> {
let msg_bundle = MsgBundle::try_from(msg).map_err(Error::MsgBundleError)?;
let table = DataTable::from_msg_bundle(msg_bundle);
let table: DataTable = msg.try_into().unwrap(); // TODO

// TODO(#1619): batch all of this
for row in table.as_rows() {
Expand Down Expand Up @@ -231,8 +230,10 @@ impl LogDb {
self.log_messages.is_empty()
}

// TODO: drop LogMsg storage
pub fn add(&mut self, msg: LogMsg) -> Result<(), Error> {
crate::profile_function!();

match &msg {
LogMsg::BeginRecordingMsg(msg) => self.add_begin_recording_msg(msg),
LogMsg::EntityPathOpMsg(msg) => {
Expand All @@ -243,13 +244,20 @@ impl LogDb {
} = msg;
self.entity_db.add_path_op(*msg_id, time_point, path_op);
}
LogMsg::ArrowMsg(msg) => {
self.entity_db.try_add_arrow_data_msg(msg)?;
LogMsg::ArrowMsg(inner) => {
self.entity_db.try_add_arrow_data_msg(inner)?;
// TODO: oh boy, that thing again
self.chronological_message_ids.extend(msg.ids());
self.log_messages
.extend(msg.ids().into_iter().map(|msg_id| (msg_id, msg.clone())));
return Ok(());
}
LogMsg::Goodbye(_) => {}
}

self.chronological_message_ids.push(msg.id());
self.log_messages.insert(msg.id(), msg);

Ok(())
}

Expand Down
3 changes: 2 additions & 1 deletion crates/re_log_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ re_tuid.workspace = true
# External
ahash.workspace = true
array-init = "2.1.0"
arrow2 = { workspace = true, features = ["io_ipc", "io_print"] }
arrow2 = { workspace = true, features = ["io_ipc", "io_print", "compute_concatenate"] }
arrow2_convert.workspace = true
bytemuck = "1.11"
document-features = "0.2"
Expand All @@ -72,6 +72,7 @@ ndarray.workspace = true
nohash-hasher = "0.2"
num-derive = "0.3"
num-traits = "0.2"
smallvec = "1.10"
thiserror.workspace = true
time = { workspace = true, default-features = false, features = [
"formatting",
Expand Down
34 changes: 15 additions & 19 deletions crates/re_log_types/benches/msg_encode_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

use re_log_types::{
datagen::{build_frame_nr, build_some_colors, build_some_point2d},
entity_path,
msg_bundle::MsgBundle,
ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId,
entity_path, ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId,
};

use criterion::{criterion_group, criterion_main, Criterion};
Expand Down Expand Up @@ -39,19 +37,19 @@ fn decode_log_msgs(mut bytes: &[u8]) -> Vec<LogMsg> {
messages
}

fn generate_messages(bundles: &[MsgBundle]) -> Vec<LogMsg> {
bundles
fn generate_messages(tables: &[DataTable]) -> Vec<LogMsg> {
tables
.iter()
.map(|bundle| LogMsg::ArrowMsg(ArrowMsg::try_from(bundle.clone()).unwrap()))
.map(|table| LogMsg::ArrowMsg(ArrowMsg::try_from(table.clone()).unwrap()))
.collect()
}

fn decode_message_bundles(messages: &[LogMsg]) -> Vec<MsgBundle> {
fn decode_message_bundles(messages: &[LogMsg]) -> Vec<DataTable> {
messages
.iter()
.map(|log_msg| {
if let LogMsg::ArrowMsg(arrow_msg) = log_msg {
MsgBundle::try_from(arrow_msg).unwrap()
DataTable::try_from(arrow_msg).unwrap()
} else {
unreachable!()
}
Expand All @@ -60,7 +58,7 @@ fn decode_message_bundles(messages: &[LogMsg]) -> Vec<MsgBundle> {
}

fn mono_points_arrow(c: &mut Criterion) {
fn generate_message_bundles() -> Vec<MsgBundle> {
fn generate_tables() -> Vec<DataTable> {
(0..NUM_POINTS)
.map(|i| {
DataTable::from_rows(
Expand All @@ -73,7 +71,6 @@ fn mono_points_arrow(c: &mut Criterion) {
(build_some_point2d(1), build_some_colors(1)),
)],
)
.into_msg_bundle()
})
.collect()
}
Expand All @@ -82,9 +79,9 @@ fn mono_points_arrow(c: &mut Criterion) {
let mut group = c.benchmark_group("mono_points_arrow");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
b.iter(generate_message_bundles);
b.iter(generate_tables);
});
let bundles = generate_message_bundles();
let bundles = generate_tables();
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(&bundles));
});
Expand All @@ -93,7 +90,7 @@ fn mono_points_arrow(c: &mut Criterion) {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(&generate_message_bundles())));
b.iter(|| encode_log_msgs(&generate_messages(&generate_tables())));
});

let encoded = encode_log_msgs(&messages);
Expand All @@ -118,7 +115,7 @@ fn mono_points_arrow(c: &mut Criterion) {
}

fn batch_points_arrow(c: &mut Criterion) {
fn generate_message_bundles() -> Vec<MsgBundle> {
fn generate_tables() -> Vec<DataTable> {
vec![DataTable::from_rows(
MsgId::ZERO,
[DataRow::from_cells2(
Expand All @@ -131,17 +128,16 @@ fn batch_points_arrow(c: &mut Criterion) {
build_some_colors(NUM_POINTS),
),
)],
)
.into_msg_bundle()]
)]
}

{
let mut group = c.benchmark_group("batch_points_arrow");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
b.iter(generate_message_bundles);
b.iter(generate_tables);
});
let bundles = generate_message_bundles();
let bundles = generate_tables();
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(&bundles));
});
Expand All @@ -150,7 +146,7 @@ fn batch_points_arrow(c: &mut Criterion) {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(&generate_message_bundles())));
b.iter(|| encode_log_msgs(&generate_messages(&generate_tables())));
});

let encoded = encode_log_msgs(&messages);
Expand Down
Loading

0 comments on commit 34c2db8

Please sign in to comment.