Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Mar 23, 2023
1 parent eecff1a commit 9e4661e
Show file tree
Hide file tree
Showing 23 changed files with 668 additions and 356 deletions.
1 change: 1 addition & 0 deletions crates/re_arrow_store/src/store_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub type WriteResult<T> = ::std::result::Result<T, WriteError>;

impl DataStore {
// TODO: is there anything smarter to do server-side batching wise?
// TODO: drive the insertion by columns
pub fn insert_table(&mut self, table: &DataTable) -> WriteResult<()> {
// TODO: explain that the magic of batching is in how the data is all in the same place
// TODO: need coalescing server-side too
Expand Down
58 changes: 26 additions & 32 deletions crates/re_data_store/examples/memory_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,23 +105,20 @@ fn log_messages() {

{
let used_bytes_start = live_bytes();
let msg_bundle = Box::new(
DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
1,
build_some_point2d(1),
)],
)
.into_msg_bundle(),
);
let msg_bundle_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*msg_bundle).unwrap()));
let table = Box::new(DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
1,
build_some_point2d(1),
)],
));
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*table).unwrap()));
let log_msg_bytes = live_bytes() - used_bytes_start;
println!("Arrow MsgBundle containing a Pos2 uses {msg_bundle_bytes} bytes in RAM");
println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM");
let encoded = encode_log_msg(&log_msg);
println!(
"Arrow LogMsg containing a Pos2 uses {}-{log_msg_bytes} bytes in RAM, and {} bytes encoded",
Expand All @@ -131,23 +128,20 @@ fn log_messages() {

{
let used_bytes_start = live_bytes();
let msg_bundle = Box::new(
DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
NUM_POINTS as _,
build_some_point2d(NUM_POINTS),
)],
)
.into_msg_bundle(),
);
let msg_bundle_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*msg_bundle).unwrap()));
let table = Box::new(DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
NUM_POINTS as _,
build_some_point2d(NUM_POINTS),
)],
));
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*table).unwrap()));
let log_msg_bytes = live_bytes() - used_bytes_start;
println!("Arrow MsgBundle containing a Pos2 uses {msg_bundle_bytes} bytes in RAM");
println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM");
let encoded = encode_log_msg(&log_msg);
println!(
"Arrow LogMsg containing {NUM_POINTS}x Pos2 uses {}-{log_msg_bytes} bytes in RAM, and {} bytes encoded",
Expand Down
13 changes: 9 additions & 4 deletions crates/re_data_store/src/log_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ impl EntityDb {
}

fn try_add_arrow_data_msg(&mut self, msg: &ArrowMsg) -> Result<(), Error> {
let msg_bundle = MsgBundle::try_from(msg).map_err(Error::MsgBundleError)?;
let table = DataTable::from_msg_bundle(msg_bundle);
let table: DataTable = msg.try_into().unwrap(); // TODO

// TODO(cmc): batching
for row in table.as_rows() {
Expand Down Expand Up @@ -233,6 +232,7 @@ impl LogDb {

pub fn add(&mut self, msg: LogMsg) -> Result<(), Error> {
crate::profile_function!();

match &msg {
LogMsg::BeginRecordingMsg(msg) => self.add_begin_recording_msg(msg),
LogMsg::EntityPathOpMsg(msg) => {
Expand All @@ -244,12 +244,17 @@ impl LogDb {
self.entity_db.add_path_op(*msg_id, time_point, path_op);
}
LogMsg::ArrowMsg(msg) => {
// TODO(cmc): batching
self.entity_db.try_add_arrow_data_msg(msg)?;
}
LogMsg::Goodbye(_) => {}
}
self.chronological_message_ids.push(msg.id());
self.log_messages.insert(msg.id(), msg);

// TODO: oh boy, that thing again
self.chronological_message_ids.extend(msg.ids());
self.log_messages
.extend(msg.ids().into_iter().map(|msg_id| (msg_id, msg.clone()))); // TODO

Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion crates/re_log_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ re_tuid.workspace = true
# External
ahash.workspace = true
array-init = "2.1.0"
arrow2 = { workspace = true, features = ["io_ipc", "io_print"] }
arrow2 = { workspace = true, features = ["io_ipc", "io_print", "compute_concatenate"] }
arrow2_convert.workspace = true
bytemuck = "1.11"
document-features = "0.2"
Expand Down
58 changes: 26 additions & 32 deletions crates/re_log_types/benches/msg_encode_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

use re_log_types::{
datagen::{build_frame_nr, build_some_colors, build_some_point2d},
entity_path,
msg_bundle::MsgBundle,
ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId,
entity_path, ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId,
};

use criterion::{criterion_group, criterion_main, Criterion};
Expand Down Expand Up @@ -39,19 +37,19 @@ fn decode_log_msgs(mut bytes: &[u8]) -> Vec<LogMsg> {
messages
}

fn generate_messages(bundles: &[MsgBundle]) -> Vec<LogMsg> {
bundles
fn generate_messages(tables: &[DataTable]) -> Vec<LogMsg> {
tables
.iter()
.map(|bundle| LogMsg::ArrowMsg(ArrowMsg::try_from(bundle.clone()).unwrap()))
.map(|table| LogMsg::ArrowMsg(ArrowMsg::try_from(table.clone()).unwrap()))
.collect()
}

fn decode_message_bundles(messages: &[LogMsg]) -> Vec<MsgBundle> {
fn decode_message_bundles(messages: &[LogMsg]) -> Vec<DataTable> {
messages
.iter()
.map(|log_msg| {
if let LogMsg::ArrowMsg(arrow_msg) = log_msg {
MsgBundle::try_from(arrow_msg).unwrap()
DataTable::try_from(arrow_msg).unwrap()
} else {
unreachable!()
}
Expand All @@ -60,7 +58,7 @@ fn decode_message_bundles(messages: &[LogMsg]) -> Vec<MsgBundle> {
}

fn mono_points_arrow(c: &mut Criterion) {
fn generate_message_bundles() -> Vec<MsgBundle> {
fn generate_tables() -> Vec<DataTable> {
(0..NUM_POINTS)
.map(|i| {
DataTable::from_rows(
Expand All @@ -73,7 +71,6 @@ fn mono_points_arrow(c: &mut Criterion) {
(build_some_point2d(1), build_some_colors(1)),
)],
)
.into_msg_bundle()
})
.collect()
}
Expand All @@ -82,9 +79,9 @@ fn mono_points_arrow(c: &mut Criterion) {
let mut group = c.benchmark_group("mono_points_arrow");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
b.iter(generate_message_bundles);
b.iter(generate_tables);
});
let bundles = generate_message_bundles();
let bundles = generate_tables();
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(&bundles));
});
Expand All @@ -93,7 +90,7 @@ fn mono_points_arrow(c: &mut Criterion) {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(&generate_message_bundles())));
b.iter(|| encode_log_msgs(&generate_messages(&generate_tables())));
});

let encoded = encode_log_msgs(&messages);
Expand All @@ -118,32 +115,29 @@ fn mono_points_arrow(c: &mut Criterion) {
}

fn batch_points_arrow(c: &mut Criterion) {
fn generate_message_bundles() -> Vec<MsgBundle> {
vec![
DataTable::from_rows(
fn generate_tables() -> Vec<DataTable> {
vec![DataTable::from_rows(
MsgId::ZERO,
[DataRow::from_cells2(
MsgId::ZERO,
[DataRow::from_cells2(
MsgId::ZERO,
entity_path!("points"),
[build_frame_nr(0.into())],
1,
(
build_some_point2d(NUM_POINTS),
build_some_colors(NUM_POINTS),
),
)],
)
.into_msg_bundle(), //
]
entity_path!("points"),
[build_frame_nr(0.into())],
1,
(
build_some_point2d(NUM_POINTS),
build_some_colors(NUM_POINTS),
),
)],
)]
}

{
let mut group = c.benchmark_group("batch_points_arrow");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
b.iter(generate_message_bundles);
b.iter(generate_tables);
});
let bundles = generate_message_bundles();
let bundles = generate_tables();
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(&bundles));
});
Expand All @@ -152,7 +146,7 @@ fn batch_points_arrow(c: &mut Criterion) {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(&generate_message_bundles())));
b.iter(|| encode_log_msgs(&generate_messages(&generate_tables())));
});

let encoded = encode_log_msgs(&messages);
Expand Down
30 changes: 11 additions & 19 deletions crates/re_log_types/src/arrow_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,16 @@ use arrow2::{array::Array, chunk::Chunk, datatypes::Schema};
#[must_use]
#[derive(Clone, Debug, PartialEq)]
pub struct ArrowMsg {
/// A unique id per [`crate::LogMsg`].
pub msg_id: MsgId,
// TODO: explain, that's a tricky one
pub timepoint: TimePoint,

/// Arrow schema
/// Schema for all control & data columns.
pub schema: Schema,

/// Arrow chunk
/// Data for all control & data columns.
pub chunk: Chunk<Box<dyn Array>>,
}

impl ArrowMsg {
pub fn time_point(&self) -> Result<TimePoint, crate::msg_bundle::MsgBundleError> {
crate::msg_bundle::extract_timelines(&self.schema, &self.chunk)
}
}

#[cfg(feature = "serde")]
impl serde::Serialize for ArrowMsg {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
Expand All @@ -48,7 +42,7 @@ impl serde::Serialize for ArrowMsg {
.map_err(|e| serde::ser::Error::custom(e.to_string()))?;

let mut inner = serializer.serialize_tuple(2)?;
inner.serialize_element(&self.msg_id)?;
inner.serialize_element(&self.timepoint)?;
inner.serialize_element(&serde_bytes::ByteBuf::from(buf))?;
inner.end()
}
Expand All @@ -75,10 +69,10 @@ impl<'de> serde::Deserialize<'de> for ArrowMsg {
where
A: serde::de::SeqAccess<'de>,
{
let msg_id: Option<MsgId> = seq.next_element()?;
let timepoint: Option<TimePoint> = seq.next_element()?;
let buf: Option<serde_bytes::ByteBuf> = seq.next_element()?;

if let (Some(msg_id), Some(buf)) = (msg_id, buf) {
if let (Some(timepoint), Some(buf)) = (timepoint, buf) {
let mut cursor = std::io::Cursor::new(buf);
let metadata = read_stream_metadata(&mut cursor).unwrap();
let mut stream = StreamReader::new(cursor, metadata, None);
Expand All @@ -93,7 +87,7 @@ impl<'de> serde::Deserialize<'de> for ArrowMsg {
.ok_or_else(|| serde::de::Error::custom("No Chunk found in stream"))?;

Ok(ArrowMsg {
msg_id,
timepoint,
schema: stream.metadata().schema.clone(),
chunk,
})
Expand Down Expand Up @@ -125,7 +119,7 @@ mod tests {
let schema = Schema::default();
let chunk = Chunk::new(vec![]);
let msg = ArrowMsg {
msg_id: MsgId::ZERO,
timepoint: [].into(),
schema,
chunk,
};
Expand Down Expand Up @@ -170,12 +164,10 @@ mod tests {
(build_some_point2d(1), build_some_rects(1)),
);

let msg_bundle = row
.into_table(MsgId::ZERO /* not used (yet) */)
.into_msg_bundle();
let table = row.into_table(MsgId::ZERO /* not used (yet) */);

// TODO(cmc): that's not a full roundtrip though
let msg_in: ArrowMsg = msg_bundle.try_into().unwrap();
let msg_in: ArrowMsg = table.try_into().unwrap();
let buf = rmp_serde::to_vec(&msg_in).unwrap();
let msg_out: ArrowMsg = rmp_serde::from_slice(&buf).unwrap();
assert_eq!(msg_in, msg_out);
Expand Down
5 changes: 5 additions & 0 deletions crates/re_log_types/src/component_types/msg_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ impl MsgId {
self.0.as_u128()
}

#[inline]
pub fn nanoseconds_since_epoch(&self) -> u64 {
self.0.nanoseconds_since_epoch()
}

/// A shortened string representation of the message id.
#[inline]
pub fn short_string(&self) -> String {
Expand Down
9 changes: 0 additions & 9 deletions crates/re_log_types/src/data_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,6 @@ impl DataRow {
cells,
};

// TODO(cmc): Since we don't yet support mixing splatted data within instanced rows,
// we need to craft an array of `MsgId`s that matches the length of the other components.
// TODO(cmc): This goes away with batching & al
if !components.contains(&MsgId::name()) {
this.cells.push(DataCell::from_native(
vec![row_id; this.num_instances() as _].iter(),
));
}

this
}

Expand Down
Loading

0 comments on commit 9e4661e

Please sign in to comment.