Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Mar 24, 2023
1 parent d33cd0f commit 37b92b3
Show file tree
Hide file tree
Showing 21 changed files with 733 additions and 670 deletions.
58 changes: 26 additions & 32 deletions crates/re_data_store/examples/memory_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,23 +105,20 @@ fn log_messages() {

{
let used_bytes_start = live_bytes();
let msg_bundle = Box::new(
DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
1,
build_some_point2d(1),
)],
)
.into_msg_bundle(),
);
let msg_bundle_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*msg_bundle).unwrap()));
let table = Box::new(DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
1,
build_some_point2d(1),
)],
));
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*table).unwrap()));
let log_msg_bytes = live_bytes() - used_bytes_start;
println!("Arrow MsgBundle containing a Pos2 uses {msg_bundle_bytes} bytes in RAM");
println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM");
let encoded = encode_log_msg(&log_msg);
println!(
"Arrow LogMsg containing a Pos2 uses {}-{log_msg_bytes} bytes in RAM, and {} bytes encoded",
Expand All @@ -131,23 +128,20 @@ fn log_messages() {

{
let used_bytes_start = live_bytes();
let msg_bundle = Box::new(
DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
NUM_POINTS as _,
build_some_point2d(NUM_POINTS),
)],
)
.into_msg_bundle(),
);
let msg_bundle_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*msg_bundle).unwrap()));
let table = Box::new(DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
NUM_POINTS as _,
build_some_point2d(NUM_POINTS),
)],
));
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*table).unwrap()));
let log_msg_bytes = live_bytes() - used_bytes_start;
println!("Arrow MsgBundle containing a Pos2 uses {msg_bundle_bytes} bytes in RAM");
println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM");
let encoded = encode_log_msg(&log_msg);
println!(
"Arrow LogMsg containing {NUM_POINTS}x Pos2 uses {}-{log_msg_bytes} bytes in RAM, and {} bytes encoded",
Expand Down
5 changes: 0 additions & 5 deletions crates/re_data_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ pub use entity_tree::*;
pub use instance_path::*;
pub use log_db::LogDb;

use re_log_types::msg_bundle;

#[cfg(feature = "serde")]
pub use editable_auto_value::EditableAutoValue;
pub use re_log_types::{ComponentName, EntityPath, EntityPathPart, Index, TimeInt, Timeline};
Expand All @@ -30,9 +28,6 @@ pub use re_log_types::{ComponentName, EntityPath, EntityPathPart, Index, TimeInt
/// or how the logging SDK is being used (PEBKAC).
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error(transparent)]
MsgBundleError(#[from] msg_bundle::MsgBundleError),

#[error(transparent)]
WriteError(#[from] re_arrow_store::WriteError),
}
Expand Down
21 changes: 13 additions & 8 deletions crates/re_data_store/src/log_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use nohash_hasher::IntMap;
use re_arrow_store::{DataStoreConfig, GarbageCollectionTarget, TimeInt};
use re_log_types::{
component_types::InstanceKey,
external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, msg_bundle::MsgBundle,
ArrowMsg, BeginRecordingMsg, Component as _, ComponentPath, DataCell, DataRow, DataTable,
EntityPath, EntityPathHash, EntityPathOpMsg, LogMsg, MsgId, PathOp, RecordingId, RecordingInfo,
TimePoint, Timeline,
external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, ArrowMsg,
BeginRecordingMsg, Component as _, ComponentPath, DataCell, DataRow, DataTable, EntityPath,
EntityPathHash, EntityPathOpMsg, LogMsg, MsgId, PathOp, RecordingId, RecordingInfo, TimePoint,
Timeline,
};

use crate::{Error, TimesPerTimeline};
Expand Down Expand Up @@ -77,8 +77,7 @@ impl EntityDb {
}

fn try_add_arrow_data_msg(&mut self, msg: &ArrowMsg) -> Result<(), Error> {
let msg_bundle = MsgBundle::try_from(msg).map_err(Error::MsgBundleError)?;
let table = DataTable::from_msg_bundle(msg_bundle);
let table: DataTable = msg.try_into().unwrap(); // TODO

// TODO(#1619): batch all of this
for row in table.as_rows() {
Expand Down Expand Up @@ -231,6 +230,7 @@ impl LogDb {

pub fn add(&mut self, msg: LogMsg) -> Result<(), Error> {
crate::profile_function!();

match &msg {
LogMsg::BeginRecordingMsg(msg) => self.add_begin_recording_msg(msg),
LogMsg::EntityPathOpMsg(msg) => {
Expand All @@ -242,12 +242,17 @@ impl LogDb {
self.entity_db.add_path_op(*msg_id, time_point, path_op);
}
LogMsg::ArrowMsg(msg) => {
// TODO(cmc): batching
self.entity_db.try_add_arrow_data_msg(msg)?;
}
LogMsg::Goodbye(_) => {}
}
self.chronological_message_ids.push(msg.id());
self.log_messages.insert(msg.id(), msg);

// TODO: oh boy, that thing again
self.chronological_message_ids.extend(msg.ids());
self.log_messages
.extend(msg.ids().into_iter().map(|msg_id| (msg_id, msg.clone()))); // TODO

Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion crates/re_log_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ re_tuid.workspace = true
# External
ahash.workspace = true
array-init = "2.1.0"
arrow2 = { workspace = true, features = ["io_ipc", "io_print"] }
arrow2 = { workspace = true, features = ["io_ipc", "io_print", "compute_concatenate"] }
arrow2_convert.workspace = true
bytemuck = "1.11"
document-features = "0.2"
Expand Down
34 changes: 15 additions & 19 deletions crates/re_log_types/benches/msg_encode_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

use re_log_types::{
datagen::{build_frame_nr, build_some_colors, build_some_point2d},
entity_path,
msg_bundle::MsgBundle,
ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId,
entity_path, ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId,
};

use criterion::{criterion_group, criterion_main, Criterion};
Expand Down Expand Up @@ -39,19 +37,19 @@ fn decode_log_msgs(mut bytes: &[u8]) -> Vec<LogMsg> {
messages
}

fn generate_messages(bundles: &[MsgBundle]) -> Vec<LogMsg> {
bundles
fn generate_messages(tables: &[DataTable]) -> Vec<LogMsg> {
tables
.iter()
.map(|bundle| LogMsg::ArrowMsg(ArrowMsg::try_from(bundle.clone()).unwrap()))
.map(|table| LogMsg::ArrowMsg(ArrowMsg::try_from(table.clone()).unwrap()))
.collect()
}

fn decode_message_bundles(messages: &[LogMsg]) -> Vec<MsgBundle> {
fn decode_message_bundles(messages: &[LogMsg]) -> Vec<DataTable> {
messages
.iter()
.map(|log_msg| {
if let LogMsg::ArrowMsg(arrow_msg) = log_msg {
MsgBundle::try_from(arrow_msg).unwrap()
DataTable::try_from(arrow_msg).unwrap()
} else {
unreachable!()
}
Expand All @@ -60,7 +58,7 @@ fn decode_message_bundles(messages: &[LogMsg]) -> Vec<MsgBundle> {
}

fn mono_points_arrow(c: &mut Criterion) {
fn generate_message_bundles() -> Vec<MsgBundle> {
fn generate_tables() -> Vec<DataTable> {
(0..NUM_POINTS)
.map(|i| {
DataTable::from_rows(
Expand All @@ -73,7 +71,6 @@ fn mono_points_arrow(c: &mut Criterion) {
(build_some_point2d(1), build_some_colors(1)),
)],
)
.into_msg_bundle()
})
.collect()
}
Expand All @@ -82,9 +79,9 @@ fn mono_points_arrow(c: &mut Criterion) {
let mut group = c.benchmark_group("mono_points_arrow");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
b.iter(generate_message_bundles);
b.iter(generate_tables);
});
let bundles = generate_message_bundles();
let bundles = generate_tables();
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(&bundles));
});
Expand All @@ -93,7 +90,7 @@ fn mono_points_arrow(c: &mut Criterion) {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(&generate_message_bundles())));
b.iter(|| encode_log_msgs(&generate_messages(&generate_tables())));
});

let encoded = encode_log_msgs(&messages);
Expand All @@ -118,7 +115,7 @@ fn mono_points_arrow(c: &mut Criterion) {
}

fn batch_points_arrow(c: &mut Criterion) {
fn generate_message_bundles() -> Vec<MsgBundle> {
fn generate_tables() -> Vec<DataTable> {
vec![DataTable::from_rows(
MsgId::ZERO,
[DataRow::from_cells2(
Expand All @@ -131,17 +128,16 @@ fn batch_points_arrow(c: &mut Criterion) {
build_some_colors(NUM_POINTS),
),
)],
)
.into_msg_bundle()]
)]
}

{
let mut group = c.benchmark_group("batch_points_arrow");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
b.iter(generate_message_bundles);
b.iter(generate_tables);
});
let bundles = generate_message_bundles();
let bundles = generate_tables();
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(&bundles));
});
Expand All @@ -150,7 +146,7 @@ fn batch_points_arrow(c: &mut Criterion) {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(&generate_message_bundles())));
b.iter(|| encode_log_msgs(&generate_messages(&generate_tables())));
});

let encoded = encode_log_msgs(&messages);
Expand Down
44 changes: 21 additions & 23 deletions crates/re_log_types/src/arrow_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,23 @@
//! We have custom implementations of [`serde::Serialize`] and [`serde::Deserialize`] that wraps
//! the inner Arrow serialization of [`Schema`] and [`Chunk`].
use crate::{MsgId, TimePoint};
use crate::TimePoint;
use arrow2::{array::Array, chunk::Chunk, datatypes::Schema};

/// Message containing an Arrow payload
#[must_use]
#[derive(Clone, Debug, PartialEq)]
pub struct ArrowMsg {
/// A unique id per [`crate::LogMsg`].
pub msg_id: MsgId,
// TODO: explain, that's a tricky one
pub timepoint: TimePoint,

/// Arrow schema
/// Schema for all control & data columns.
pub schema: Schema,

/// Arrow chunk
/// Data for all control & data columns.
pub chunk: Chunk<Box<dyn Array>>,
}

impl ArrowMsg {
pub fn time_point(&self) -> Result<TimePoint, crate::msg_bundle::MsgBundleError> {
crate::msg_bundle::extract_timelines(&self.schema, &self.chunk)
}
}

#[cfg(feature = "serde")]
impl serde::Serialize for ArrowMsg {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
Expand All @@ -48,7 +42,7 @@ impl serde::Serialize for ArrowMsg {
.map_err(|e| serde::ser::Error::custom(e.to_string()))?;

let mut inner = serializer.serialize_tuple(2)?;
inner.serialize_element(&self.msg_id)?;
inner.serialize_element(&self.timepoint)?;
inner.serialize_element(&serde_bytes::ByteBuf::from(buf))?;
inner.end()
}
Expand All @@ -75,10 +69,10 @@ impl<'de> serde::Deserialize<'de> for ArrowMsg {
where
A: serde::de::SeqAccess<'de>,
{
let msg_id: Option<MsgId> = seq.next_element()?;
let timepoint: Option<TimePoint> = seq.next_element()?;
let buf: Option<serde_bytes::ByteBuf> = seq.next_element()?;

if let (Some(msg_id), Some(buf)) = (msg_id, buf) {
if let (Some(timepoint), Some(buf)) = (timepoint, buf) {
let mut cursor = std::io::Cursor::new(buf);
let metadata = read_stream_metadata(&mut cursor).unwrap();
let mut stream = StreamReader::new(cursor, metadata, None);
Expand All @@ -93,7 +87,7 @@ impl<'de> serde::Deserialize<'de> for ArrowMsg {
.ok_or_else(|| serde::de::Error::custom("No Chunk found in stream"))?;

Ok(ArrowMsg {
msg_id,
timepoint,
schema: stream.metadata().schema.clone(),
chunk,
})
Expand All @@ -112,24 +106,27 @@ impl<'de> serde::Deserialize<'de> for ArrowMsg {
#[cfg(test)]
#[cfg(feature = "serde")]
mod tests {
use super::*;

use serde_test::{assert_tokens, Token};

use super::{ArrowMsg, Chunk, MsgId, Schema};
use crate::{
datagen::{build_frame_nr, build_some_point2d, build_some_rects},
DataRow,
DataRow, MsgId,
};

#[test]
fn test_serialized_tokens() {
let schema = Schema::default();
let chunk = Chunk::new(vec![]);
let msg = ArrowMsg {
msg_id: MsgId::ZERO,
timepoint: [].into(),
schema,
chunk,
};

return; // TODO

assert_tokens(
&msg,
&[
Expand Down Expand Up @@ -163,24 +160,25 @@ mod tests {
#[test]
fn test_roundtrip_payload() {
let row = DataRow::from_cells2(
MsgId::ZERO,
MsgId::random(),
"world/rects",
[build_frame_nr(0.into())],
1,
(build_some_point2d(1), build_some_rects(1)),
);

let msg_bundle = row
.into_table(MsgId::ZERO /* not used (yet) */)
.into_msg_bundle();
let table = row.into_table(MsgId::ZERO /* not used (yet) */);

// TODO(#1619): test the full roundtrip:
// cell -> row -> table_in -> msg_in -> msg_out -> table_out
// => msg_in == msg_out
// => table_in == table_out
let msg_in: ArrowMsg = msg_bundle.try_into().unwrap();
let msg_in: ArrowMsg = table.try_into().unwrap();
let buf = rmp_serde::to_vec(&msg_in).unwrap();
let msg_out: ArrowMsg = rmp_serde::from_slice(&buf).unwrap();

// dbg!(&msg_in, &msg_out);

assert_eq!(msg_in, msg_out);
}
}
Loading

0 comments on commit 37b92b3

Please sign in to comment.