Skip to content

Commit

Permalink
self reviewW
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Mar 27, 2023
1 parent 36c2b4b commit 110dfa2
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 55 deletions.
22 changes: 11 additions & 11 deletions crates/re_data_store/examples/memory_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn live_bytes() -> usize {

// ----------------------------------------------------------------------------

use re_log_types::{entity_path, DataRow, DataTable, MsgId};
use re_log_types::{entity_path, DataRow, MsgId};

fn main() {
log_messages();
Expand Down Expand Up @@ -105,16 +105,16 @@ fn log_messages() {

{
let used_bytes_start = live_bytes();
let table = Box::new(DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
let table = Box::new(
DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
1,
build_some_point2d(1),
)],
));
)
.into_table(),
);
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(&*table).unwrap()));
let log_msg_bytes = live_bytes() - used_bytes_start;
Expand All @@ -128,16 +128,16 @@ fn log_messages() {

{
let used_bytes_start = live_bytes();
let table = Box::new(DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
let table = Box::new(
DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
NUM_POINTS as _,
build_some_point2d(NUM_POINTS),
)],
));
)
.into_table(),
);
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(&*table).unwrap()));
let log_msg_bytes = live_bytes() - used_bytes_start;
Expand Down
14 changes: 9 additions & 5 deletions crates/re_log_types/src/arrow_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ use arrow2::{array::Array, chunk::Chunk, datatypes::Schema};
#[derive(Clone, Debug, PartialEq)]
pub struct ArrowMsg {
/// Unique identifier for the [`DataTable`] in this message.
///
/// NOTE(#1619): While we're in the process of transitioning towards end-to-end batching, the
/// `table_id` is always the same as the `row_id` as the first and only row.
pub table_id: MsgId,

/// The minimum values for all timelines across the entire batch of data.
/// The maximum values for all timelines across the entire batch of data.
///
/// Used to timestamp the batch as a whole for e.g. latency measurements.
pub timepoint_min: TimePoint,
/// Used to timestamp the batch as a whole for e.g. latency measurements without having to
/// deserialize the arrow payload.
pub timepoint_max: TimePoint,

/// Schema for all control & data columns.
pub schema: Schema,
Expand Down Expand Up @@ -48,7 +52,7 @@ impl serde::Serialize for ArrowMsg {

let mut inner = serializer.serialize_tuple(3)?;
inner.serialize_element(&self.table_id)?;
inner.serialize_element(&self.timepoint_min)?;
inner.serialize_element(&self.timepoint_max)?;
inner.serialize_element(&serde_bytes::ByteBuf::from(buf))?;
inner.end()
}
Expand Down Expand Up @@ -97,7 +101,7 @@ impl<'de> serde::Deserialize<'de> for ArrowMsg {

Ok(ArrowMsg {
table_id,
timepoint_min,
timepoint_max: timepoint_min,
schema: stream.metadata().schema.clone(),
chunk,
})
Expand Down
2 changes: 1 addition & 1 deletion crates/re_log_types/src/data_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl std::ops::IndexMut<usize> for DataCellRow {
/// # let row_id = MsgId::ZERO;
/// # let timepoint = [
/// # (Timeline::new_sequence("frame_nr"), 42.into()), //
/// # (Timeline::new_sequence("pouet"), 666.into()), //
/// # (Timeline::new_sequence("clock"), 666.into()), //
/// # ];
/// #
/// let num_instances = 2;
Expand Down
50 changes: 30 additions & 20 deletions crates/re_log_types/src/data_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ impl std::ops::IndexMut<usize> for DataCellColumn {
/// ┌───────────────────────┬───────────────────────────────────┬────────────────────┬─────────────────────┬─────────────┬──────────────────────────────────┬─────────────────┐
/// │ rerun.row_id ┆ rerun.timepoint ┆ rerun.entity_path ┆ rerun.num_instances ┆ rerun.label ┆ rerun.point2d ┆ rerun.colorrgba │
/// ╞═══════════════════════╪═══════════════════════════════════╪════════════════════╪═════════════════════╪═════════════╪══════════════════════════════════╪═════════════════╡
/// │ {167967218, 54449486} ┆ [{frame_nr, 1, 1}, {pouet, 1, 1}] ┆ a ┆ 2 ┆ [] ┆ [{x: 10, y: 10}, {x: 20, y: 20}] ┆ [2155905279] │
/// │ {167967218, 54449486} ┆ [{frame_nr, 1, 1}, {clock, 1, 1}] ┆ a ┆ 2 ┆ [] ┆ [{x: 10, y: 10}, {x: 20, y: 20}] ┆ [2155905279] │
/// ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
/// │ {167967218, 54449486} ┆ [{frame_nr, 1, 1}, {pouet, 1, 2}] ┆ b ┆ 0 ┆ - ┆ - ┆ [] │
/// │ {167967218, 54449486} ┆ [{frame_nr, 1, 1}, {clock, 1, 2}] ┆ b ┆ 0 ┆ - ┆ - ┆ [] │
/// ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
/// │ {167967218, 54449486} ┆ [{frame_nr, 1, 2}, {pouet, 1, 1}] ┆ c ┆ 1 ┆ [hey] ┆ - ┆ [4294967295] │
/// │ {167967218, 54449486} ┆ [{frame_nr, 1, 2}, {clock, 1, 1}] ┆ c ┆ 1 ┆ [hey] ┆ - ┆ [4294967295] │
/// └───────────────────────┴───────────────────────────────────┴────────────────────┴─────────────────────┴─────────────┴──────────────────────────────────┴─────────────────┘
/// ```
///
Expand All @@ -163,10 +163,10 @@ impl std::ops::IndexMut<usize> for DataCellColumn {
/// #
/// # let table_id = MsgId::ZERO; // not used (yet)
/// #
/// # let timepoint = |frame_nr: i64, pouet: i64| {
/// # let timepoint = |frame_nr: i64, clock: i64| {
/// # TimePoint::from([
/// # (Timeline::new_sequence("frame_nr"), frame_nr.into()),
/// # (Timeline::new_sequence("pouet"), pouet.into()),
/// # (Timeline::new_sequence("clock"), clock.into()),
/// # ])
/// # };
/// #
Expand Down Expand Up @@ -356,12 +356,13 @@ impl DataTable {
})
}

/// Computes the minimum value for each and every timeline present across this entire table,
/// Computes the maximum value for each and every timeline present across this entire table,
/// and returns the corresponding [`TimePoint`].
pub fn min_timepoint(&self) -> TimePoint {
#[inline]
pub fn timepoint_max(&self) -> TimePoint {
self.timepoint
.iter()
.fold(TimePoint::timeless(), |acc, tp| acc.min_union(tp))
.fold(TimePoint::timeless(), |acc, tp| acc.union_max(tp))
}
}

Expand Down Expand Up @@ -393,6 +394,15 @@ pub const METADATA_TABLE_ID: &str = "rerun.table_id";

impl DataTable {
/// Serializes the entire table into an arrow payload and schema.
///
/// A serialized `DataTable` contains two kinds of columns: control & data.
///
/// * Control columns are those that drive the behavior of the storage systems.
/// They are always present, always dense, and always deserialized upon reception by the
/// server.
/// * Data columns are the one that hold component data.
/// They are optional, potentially sparse, and never deserialized on the server-side (not by
/// the storage systems, at least).
pub fn serialize(&self) -> DataTableResult<(Schema, Chunk<Box<dyn Array>>)> {
let mut schema = Schema::default();
let mut columns = Vec::new();
Expand Down Expand Up @@ -498,8 +508,8 @@ impl DataTable {

/// Serializes all data columns into an arrow payload and schema.
///
/// Data columns are the one that hold component data.
/// They are optional, potentially sparse, and never deserialized on the server-side.
/// They are optional, potentially sparse, and never deserialized on the server-side (not by
/// the storage systems, at least).
fn serialize_data_columns(&self) -> DataTableResult<(Schema, Vec<Box<dyn Array>>)> {
let Self {
table_id: _,
Expand Down Expand Up @@ -580,9 +590,11 @@ impl DataTable {

impl DataTable {
/// Deserializes an entire table from an arrow payload and schema.
pub fn deserialize(schema: &Schema, chunk: &Chunk<Box<dyn Array>>) -> DataTableResult<Self> {
let table_id = MsgId::ZERO; // not used (yet)

pub fn deserialize(
table_id: MsgId,
schema: &Schema,
chunk: &Chunk<Box<dyn Array>>,
) -> DataTableResult<Self> {
let control_indices: HashMap<&str, usize> = schema
.fields
.iter()
Expand Down Expand Up @@ -669,15 +681,12 @@ impl TryFrom<&ArrowMsg> for DataTable {
fn try_from(msg: &ArrowMsg) -> DataTableResult<Self> {
let ArrowMsg {
table_id,
timepoint_min: _,
timepoint_max: _,
schema,
chunk,
} = msg;

Ok(Self {
table_id: *table_id,
..Self::deserialize(schema, chunk)?
})
Self::deserialize(*table_id, schema, chunk)
}
}

Expand All @@ -686,12 +695,12 @@ impl TryFrom<&DataTable> for ArrowMsg {

#[inline]
fn try_from(table: &DataTable) -> DataTableResult<Self> {
let timepoint_min = table.min_timepoint();
let timepoint_min = table.timepoint_max();
let (schema, chunk) = table.serialize()?;

Ok(ArrowMsg {
table_id: table.table_id,
timepoint_min,
timepoint_max: timepoint_min,
schema,
chunk,
})
Expand All @@ -706,6 +715,7 @@ impl std::fmt::Display for DataTable {
re_log::error_once!("couldn't display data table: {err}");
std::fmt::Error
})?;
writeln!(f, "DataTable({}):", self.table_id)?;
re_format::arrow::format_table(
columns.columns(),
schema.fields.iter().map(|field| field.name.as_str()),
Expand Down
7 changes: 4 additions & 3 deletions crates/re_log_types/src/time_point/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,17 @@ impl TimePoint {
self.0.iter()
}

/// Computes the union of two `TimePoint`s, keeping the minimal value in case of conflicts.
pub fn min_union(mut self, rhs: &Self) -> Self {
/// Computes the union of two `TimePoint`s, keeping the maximum time value in case of
/// conflicts.
pub fn union_max(mut self, rhs: &Self) -> Self {
for (&timeline, &time) in rhs {
match self.0.entry(timeline) {
btree_map::Entry::Vacant(entry) => {
entry.insert(time);
}
btree_map::Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
*entry = TimeInt::min(*entry, time);
*entry = TimeInt::max(*entry, time);
}
}
}
Expand Down
14 changes: 1 addition & 13 deletions crates/re_sdk/src/msg_sender.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use re_log_types::{component_types::InstanceKey, DataRow, DataTableError};

use nohash_hasher::IntMap;

use crate::{
components::Transform,
log::{DataCell, LogMsg, MsgId},
sink::LogSink,
time::{Time, TimeInt, TimePoint, Timeline},
Component, ComponentName, EntityPath, SerializableComponent,
Component, EntityPath, SerializableComponent,
};

// TODO(#1619): Rust SDK batching
Expand Down Expand Up @@ -294,16 +292,6 @@ impl MsgSender {
.collect();
debug_assert!(all_cells.into_iter().all(|cell| cell.is_none()));

// sanity check: no row-level batching
let mut rows_per_comptype: IntMap<ComponentName, usize> = IntMap::default();
for cell in standard_cells
.iter()
.chain(&transform_cells)
.chain(&splatted)
{
*rows_per_comptype.entry(cell.component_name()).or_default() += 1;
}

// sanity check: transforms can't handle multiple instances
let num_transform_instances = transform_cells
.get(0)
Expand Down
2 changes: 1 addition & 1 deletion crates/re_sdk_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl CongestionManager {
// we don't want to drop any of these
LogMsg::BeginRecordingMsg(_) | LogMsg::EntityPathOpMsg(_) | LogMsg::Goodbye(_) => true,

LogMsg::ArrowMsg(arrow_msg) => self.should_send_time_point(&arrow_msg.timepoint_min),
LogMsg::ArrowMsg(arrow_msg) => self.should_send_time_point(&arrow_msg.timepoint_max),
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/re_viewer/src/ui/data_ui/log_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ impl DataUi for EntityPathOpMsg {
}
}

// TODO
impl DataUi for ArrowMsg {
fn data_ui(
&self,
Expand All @@ -113,6 +112,7 @@ impl DataUi for ArrowMsg {
}
};

// TODO(cmc): Come up with something a bit nicer once data tables become a common sight.
for row in table.as_rows() {
egui::Grid::new("fields").num_columns(2).show(ui, |ui| {
ui.monospace("entity_path:");
Expand Down
3 changes: 3 additions & 0 deletions crates/re_viewer/src/ui/event_log_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ fn table_row(
path_op.data_ui(ctx, ui, UiVerbosity::All, &query);
});
}
// NOTE: This really only makes sense because we don't yet have batches with more than a
// single row at the moment... and by the time we do, the event log view will have
// disappeared entirely.
LogMsg::ArrowMsg(msg) => match DataTable::try_from(msg) {
Ok(table) => {
for datarow in table.as_rows() {
Expand Down

0 comments on commit 110dfa2

Please sign in to comment.