Skip to content

Commit

Permalink
SDK batching/revamp 2: introduce RecordingContext / sunset Session (
Browse files Browse the repository at this point in the history
#1983)

* version crossbeam at the workspace level

* more DataRow size helpers

* DataTableBatcher

* lints

* lints

* self review

* don't expose shutdown to make errors impossible

* doc

* backport

* backport

* introduce RecordingStream

* clean up old stuff from the before time

* self-review

* ordered data columns in data tables

* tests

* even more tests

* rogue todo

* batching is now a reality

* some extra peace of mind

* revert

* lock shenanigans

* merge shenanigans

* address PR comments

* typos

Co-authored-by: Jeremy Leibs <[email protected]>

---------

Co-authored-by: Jeremy Leibs <[email protected]>
  • Loading branch information
teh-cmc and jleibs authored May 4, 2023
1 parent eb8cf3a commit b2b3064
Show file tree
Hide file tree
Showing 32 changed files with 1,313 additions and 670 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/re_arrow_store/src/polars_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ pub fn range_components<'a, const N: usize>(

// --- Joins ---

// TODO(#1619): none of this mess should be here
// TODO(#1759): none of this mess should be here

pub fn dataframe_from_cells<const N: usize>(
cells: &[Option<DataCell>; N],
Expand Down
10 changes: 5 additions & 5 deletions crates/re_arrow_store/src/store_dump.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use ahash::HashMapExt;
use std::collections::BTreeMap;

use arrow2::Either;
use nohash_hasher::IntMap;
use re_log_types::{
DataCellColumn, DataTable, ErasedTimeVec, RowIdVec, TableId, TimeRange, Timeline,
};
Expand Down Expand Up @@ -51,7 +51,7 @@ impl DataStore {
.take(table.num_rows() as _)
.collect(),
col_num_instances: col_num_instances.clone(),
columns: columns.clone(), // shallow
columns: columns.clone().into_iter().collect(), // shallow
}
})
}
Expand Down Expand Up @@ -92,7 +92,7 @@ impl DataStore {
.take(col_row_id.len())
.collect(),
col_num_instances: col_num_instances.clone(),
columns: columns.clone(), // shallow
columns: columns.clone().into_iter().collect(), // shallow
}
})
})
Expand Down Expand Up @@ -163,7 +163,7 @@ impl DataStore {
let col_num_instances =
filter_column(col_time, col_num_instances.iter(), time_filter).collect();

let mut columns2 = IntMap::with_capacity(columns.len());
let mut columns2 = BTreeMap::default();
for (component, column) in columns {
let column = filter_column(col_time, column.iter(), time_filter).collect();
columns2.insert(*component, DataCellColumn(column));
Expand Down
2 changes: 1 addition & 1 deletion crates/re_arrow_store/src/store_polars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
};

// TODO(#1692): all of this stuff should be defined by Data{Cell,Row,Table}, not the store.
// TODO(#1619): remove this and reimplement it on top of store serialization
// TODO(#1759): remove this and reimplement it on top of store serialization

// ---

Expand Down
5 changes: 2 additions & 3 deletions crates/re_arrow_store/src/store_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ use crate::{
IndexedTable, PersistentIndexedTable,
};

// TODO(#1619):
// - The store should insert column-per-column rather than row-per-row (purely a performance
// matter)
// TODO(cmc): the store should insert column-per-column rather than row-per-row (purely a
// performance matter).

// --- Data store ---

Expand Down
2 changes: 0 additions & 2 deletions crates/re_arrow_store/tests/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ use re_log_types::{
Timeline,
};

// TODO(#1619): introduce batching in the testing matrix

// --- LatestComponentsAt ---

#[test]
Expand Down
2 changes: 1 addition & 1 deletion crates/re_data_store/src/log_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl EntityDb {
let mut table = DataTable::from_arrow_msg(msg)?;
table.compute_all_size_bytes();

// TODO(#1619): batch all of this
// TODO(cmc): batch all of this
for row in table.to_rows() {
self.try_add_data_row(&row)?;
}
Expand Down
3 changes: 0 additions & 3 deletions crates/re_log_types/src/arrow_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ use arrow2::{array::Array, chunk::Chunk, datatypes::Schema};
#[derive(Clone, Debug, PartialEq)]
pub struct ArrowMsg {
/// Unique identifier for the [`crate::DataTable`] in this message.
///
/// NOTE(#1619): While we're in the process of transitioning towards end-to-end batching, the
/// `table_id` is always the same as the `row_id` as the first and only row.
pub table_id: TableId,

/// The maximum values for all timelines across the entire batch of data.
Expand Down
5 changes: 4 additions & 1 deletion crates/re_log_types/src/data_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ pub struct DataCellInner {
}

// TODO(cmc): We should be able to build a cell from non-reference types.
// TODO(#1619): We shouldn't have to specify the component name separately, this should be
// TODO(#1696): We shouldn't have to specify the component name separately, this should be
// part of the metadata by using an extension.
// TODO(#1696): Check that the array is indeed a leaf / component type when building a cell from an
// arrow payload.
Expand Down Expand Up @@ -533,6 +533,9 @@ impl DataCell {
inner.compute_size_bytes();
return true;
}

re_log::error_once!("cell size could _not_ be computed");

false
}
}
Expand Down
13 changes: 2 additions & 11 deletions crates/re_log_types/src/data_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,6 @@ impl RowId {
pub fn random() -> Self {
Self(re_tuid::Tuid::random())
}

/// Temporary utility while we transition to batching. See #1619.
#[doc(hidden)]
pub fn into_table_id(self) -> TableId {
TableId(self.0)
}
}

impl SizeBytes for RowId {
Expand Down Expand Up @@ -322,13 +316,10 @@ impl DataRow {
Self::try_from_cells(row_id, timepoint, entity_path, num_instances, cells).unwrap()
}

/// Turns the `DataRow` into a single-row [`DataTable`] that carries the same ID.
///
/// This only makes sense as part of our transition to batching. See #1619.
#[doc(hidden)]
/// Turns the `DataRow` into a single-row [`DataTable`].
#[inline]
pub fn into_table(self) -> DataTable {
DataTable::from_rows(self.row_id.into_table_id(), [self])
DataTable::from_rows(TableId::random(), [self])
}
}

Expand Down
18 changes: 3 additions & 15 deletions crates/re_log_types/src/data_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::BTreeMap;

use ahash::HashMap;
use itertools::Itertools as _;
use nohash_hasher::{IntMap, IntSet};
use nohash_hasher::IntSet;
use smallvec::SmallVec;

use crate::{
Expand Down Expand Up @@ -156,12 +156,6 @@ impl TableId {
pub fn random() -> Self {
Self(re_tuid::Tuid::random())
}

/// Temporary utility while we transition to batching. See #1619.
#[doc(hidden)]
pub fn into_row_id(self) -> RowId {
RowId(self.0)
}
}

impl SizeBytes for TableId {
Expand Down Expand Up @@ -353,7 +347,7 @@ pub struct DataTable {
///
/// The cells are optional since not all rows will have data for every single component
/// (i.e. the table is sparse).
pub columns: IntMap<ComponentName, DataCellColumn>,
pub columns: BTreeMap<ComponentName, DataCellColumn>,
}

impl DataTable {
Expand Down Expand Up @@ -424,7 +418,7 @@ impl DataTable {
}

// Pre-allocate all columns (one per component).
let mut columns = IntMap::default();
let mut columns = BTreeMap::default();
for component in components {
columns.insert(
component,
Expand All @@ -441,12 +435,6 @@ impl DataTable {
}
}

if col_row_id.len() > 1 {
re_log::warn_once!(
"batching features are not ready for use, use single-row data tables instead!"
);
}

Self {
table_id,
col_row_id,
Expand Down
2 changes: 1 addition & 1 deletion crates/re_log_types/src/datagen.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Generate random data for tests and benchmarks.
// TODO(#1619): It really is time for whole module to disappear.
// TODO(#1810): It really is time for whole module to disappear.

use crate::{
component_types::{self, InstanceKey},
Expand Down
12 changes: 0 additions & 12 deletions crates/re_log_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,18 +195,6 @@ pub enum LogMsg {
}

impl LogMsg {
pub fn id(&self) -> RowId {
match self {
Self::BeginRecordingMsg(msg) => msg.row_id,
Self::EntityPathOpMsg(_, msg) => msg.row_id,
Self::Goodbye(row_id) => *row_id,
// TODO(#1619): the following only makes sense because, while we support sending and
// receiving batches, we don't actually do so yet.
// We need to stop storing raw `LogMsg`s before we can benefit from our batching.
Self::ArrowMsg(_, msg) => msg.table_id.into_row_id(),
}
}

pub fn recording_id(&self) -> Option<&RecordingId> {
match self {
Self::BeginRecordingMsg(msg) => Some(&msg.info.recording_id),
Expand Down
2 changes: 2 additions & 0 deletions crates/re_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ re_log.workspace = true
re_memory.workspace = true
re_sdk_comms = { workspace = true, features = ["client"] }

crossbeam.workspace = true
document-features = "0.2"
parking_lot.workspace = true
thiserror.workspace = true
Expand All @@ -54,6 +55,7 @@ arrow2_convert.workspace = true
ndarray.workspace = true
ndarray-rand = "0.14"
rand = "0.8"
similar-asserts = "1.4.2"


[build-dependencies]
Expand Down
23 changes: 0 additions & 23 deletions crates/re_sdk/src/global.rs

This file was deleted.

19 changes: 7 additions & 12 deletions crates/re_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,15 @@
// ----------------
// Private modules:

#[cfg(feature = "global_session")]
mod global;

mod log_sink;
mod msg_sender;
mod session;
mod recording_stream;

// -------------
// Public items:

#[cfg(feature = "global_session")]
pub use self::global::global_session;

pub use self::msg_sender::{MsgSender, MsgSenderError};
pub use self::session::{Session, SessionBuilder};
pub use self::recording_stream::{RecordingStream, RecordingStreamBuilder};

pub use re_sdk_comms::default_server_addr;

Expand All @@ -36,6 +30,9 @@ impl crate::sink::LogSink for re_log_encoding::FileSink {
fn send(&self, msg: re_log_types::LogMsg) {
re_log_encoding::FileSink::send(self, msg);
}

#[inline]
fn flush_blocking(&self) {}
}

// ---------------
Expand All @@ -49,9 +46,7 @@ pub mod demo_util;
/// This is how you select whether the log stream ends up
/// sent over TCP, written to file, etc.
pub mod sink {
pub use crate::log_sink::{
disabled, BufferedSink, LogSink, MemorySink, MemorySinkStorage, TcpSink,
};
pub use crate::log_sink::{BufferedSink, LogSink, MemorySink, MemorySinkStorage, TcpSink};

#[cfg(not(target_arch = "wasm32"))]
pub use re_log_encoding::{FileSink, FileSinkError};
Expand Down Expand Up @@ -153,7 +148,7 @@ pub fn decide_logging_enabled(default_enabled: bool) -> bool {

// ----------------------------------------------------------------------------

/// Creates a new [`re_log_types::RecordingInfo`] which can be used with [`Session::new`].
/// Creates a new [`re_log_types::RecordingInfo`] which can be used with [`RecordingStream::new`].
#[track_caller] // track_caller so that we can see if we are being called from an official example.
pub fn new_recording_info(
application_id: impl Into<re_log_types::ApplicationId>,
Expand Down
Loading

0 comments on commit b2b3064

Please sign in to comment.