Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python SDK: multi-recording & multi-threading redesign #2061

Merged
merged 26 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4357474
implement multi-recording support via RecordingStream
teh-cmc May 11, 2023
e3e9a91
please get me out of here
teh-cmc May 11, 2023
6f21c50
random fixes
teh-cmc May 11, 2023
c8c9d61
no need to flush manually for notebooks anymore
teh-cmc May 11, 2023
7b737e6
jupyter deadlock from hell
teh-cmc May 11, 2023
1448c84
tornado >6.1 doesn't work with recent jupyters
teh-cmc May 11, 2023
397a5b9
Merge remote-tracking branch 'origin/main' into cmc/py/recordingstrea…
teh-cmc May 12, 2023
b14f2ba
Make every RecordingId typed and preclude the existence of 'Defaults'
jleibs May 12, 2023
ab9eb67
docstrings
jleibs May 12, 2023
68ea69a
Merge branch 'main' into cmc/py/recordingstream_exposed
teh-cmc May 15, 2023
6cff3d3
Merge branch 'main' into jleibs/explicit_recording_ids
teh-cmc May 15, 2023
8b9f4f0
addressing minor PR comments
teh-cmc May 15, 2023
cf457c8
RecordingStream in charge of Goodbye payloads
teh-cmc May 15, 2023
7280db7
fmt
teh-cmc May 15, 2023
0a9af05
Merge remote-tracking branch 'origin/jleibs/explicit_recording_ids' i…
teh-cmc May 15, 2023
e060b77
addressed PR comments
teh-cmc May 15, 2023
35782c5
say goodbye when gracefully disconnecting
teh-cmc May 15, 2023
5bacc1e
Merge branch 'jleibs/explicit_recording_ids' into cmc/py/recordingstr…
teh-cmc May 15, 2023
d3afddb
maintain status quo regarding opened recordings
teh-cmc May 15, 2023
59f79cc
init shall not return a recording
teh-cmc May 15, 2023
7f62883
tryin to document stuff
teh-cmc May 15, 2023
18a7403
directly expose batcher stuff in the sdk
teh-cmc May 15, 2023
58dd756
lints
teh-cmc May 15, 2023
ff4bcd7
bruh
teh-cmc May 15, 2023
f9383e6
more doc stuff i guess
teh-cmc May 15, 2023
56049ca
Merge remote-tracking branch 'origin/main' into cmc/py/recordingstrea…
teh-cmc May 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/re_data_store/examples/memory_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn live_bytes() -> usize {

// ----------------------------------------------------------------------------

use re_log_types::{entity_path, DataRow, RecordingId, RowId};
use re_log_types::{entity_path, DataRow, RecordingId, RecordingType, RowId};

fn main() {
log_messages();
Expand Down Expand Up @@ -91,7 +91,7 @@ fn log_messages() {

const NUM_POINTS: usize = 1_000;

let recording_id = RecordingId::random();
let recording_id = RecordingId::random(RecordingType::Data);
let timeline = Timeline::new_sequence("frame_nr");
let mut time_point = TimePoint::default();
time_point.insert(timeline, TimeInt::from(0));
Expand Down
27 changes: 18 additions & 9 deletions crates/re_data_store/src/log_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,10 @@ impl EntityDb {
// ----------------------------------------------------------------------------

/// A in-memory database built from a stream of [`LogMsg`]es.
#[derive(Default)]
pub struct LogDb {
/// The [`RecordingId`] for this log.
recording_id: RecordingId,

/// All [`EntityPathOpMsg`]s ever received.
entity_op_msgs: BTreeMap<RowId, EntityPathOpMsg>,

Expand All @@ -175,6 +177,16 @@ pub struct LogDb {
}

impl LogDb {
pub fn new(recording_id: RecordingId) -> Self {
Self {
recording_id,
entity_op_msgs: Default::default(),
data_source: None,
recording_msg: None,
entity_db: Default::default(),
}
}

pub fn recording_msg(&self) -> Option<&BeginRecordingMsg> {
self.recording_msg.as_ref()
}
Expand All @@ -183,12 +195,8 @@ impl LogDb {
self.recording_msg().map(|msg| &msg.info)
}

pub fn recording_id(&self) -> RecordingId {
if let Some(msg) = &self.recording_msg {
msg.info.recording_id.clone()
} else {
RecordingId::unknown()
}
pub fn recording_id(&self) -> &RecordingId {
&self.recording_id
}

pub fn timelines(&self) -> impl ExactSizeIterator<Item = &Timeline> {
Expand All @@ -208,7 +216,7 @@ impl LogDb {
+ self.entity_db.data_store.num_temporal_rows() as usize
}

pub fn is_default(&self) -> bool {
pub fn is_empty(&self) -> bool {
self.recording_msg.is_none() && self.num_rows() == 0
}

Expand All @@ -227,7 +235,7 @@ impl LogDb {
self.entity_db.add_path_op(*row_id, time_point, path_op);
}
LogMsg::ArrowMsg(_, inner) => self.entity_db.try_add_arrow_msg(inner)?,
LogMsg::Goodbye(_) => {}
LogMsg::Goodbye(_, _) => {}
}

Ok(())
Expand Down Expand Up @@ -264,6 +272,7 @@ impl LogDb {
let cutoff_times = self.entity_db.data_store.oldest_time_per_timeline();

let Self {
recording_id: _,
entity_op_msgs,
data_source: _,
recording_msg: _,
Expand Down
2 changes: 1 addition & 1 deletion crates/re_data_ui/src/log_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl DataUi for LogMsg {
LogMsg::BeginRecordingMsg(msg) => msg.data_ui(ctx, ui, verbosity, query),
LogMsg::EntityPathOpMsg(_, msg) => msg.data_ui(ctx, ui, verbosity, query),
LogMsg::ArrowMsg(_, msg) => msg.data_ui(ctx, ui, verbosity, query),
LogMsg::Goodbye(_) => {
LogMsg::Goodbye(_, _) => {
ui.label("Goodbye");
}
}
Expand Down
8 changes: 4 additions & 4 deletions crates/re_log_encoding/benches/msg_encode_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

use re_log_types::{
datagen::{build_frame_nr, build_some_colors, build_some_point2d},
entity_path, DataRow, DataTable, Index, LogMsg, RecordingId, RowId, TableId,
entity_path, DataRow, DataTable, Index, LogMsg, RecordingId, RecordingType, RowId, TableId,
};

use criterion::{criterion_group, criterion_main, Criterion};
Expand Down Expand Up @@ -81,7 +81,7 @@ fn mono_points_arrow(c: &mut Criterion) {
}

{
let recording_id = RecordingId::random();
let recording_id = RecordingId::random(RecordingType::Data);
let mut group = c.benchmark_group("mono_points_arrow");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
Expand Down Expand Up @@ -137,7 +137,7 @@ fn mono_points_arrow_batched(c: &mut Criterion) {
}

{
let recording_id = RecordingId::random();
let recording_id = RecordingId::random(RecordingType::Data);
let mut group = c.benchmark_group("mono_points_arrow_batched");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
Expand Down Expand Up @@ -194,7 +194,7 @@ fn batch_points_arrow(c: &mut Criterion) {
}

{
let recording_id = RecordingId::random();
let recording_id = RecordingId::random(RecordingType::Data);
let mut group = c.benchmark_group("batch_points_arrow");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
Expand Down
4 changes: 2 additions & 2 deletions crates/re_log_encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,14 @@ impl<R: std::io::Read> Iterator for Decoder<R> {
fn test_encode_decode() {
use re_log_types::{
ApplicationId, BeginRecordingMsg, LogMsg, RecordingId, RecordingInfo, RecordingSource,
RowId, Time,
RecordingType, RowId, Time,
};

let messages = vec![LogMsg::BeginRecordingMsg(BeginRecordingMsg {
row_id: RowId::random(),
info: RecordingInfo {
application_id: ApplicationId("test".to_owned()),
recording_id: RecordingId::random(),
recording_id: RecordingId::random(RecordingType::Data),
is_official_example: true,
started: Time::now(),
recording_source: RecordingSource::RustSdk {
Expand Down
102 changes: 47 additions & 55 deletions crates/re_log_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,78 +89,70 @@ macro_rules! impl_into_enum {

// ----------------------------------------------------------------------------

/// A unique id per recording (a stream of [`LogMsg`]es).
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
/// What type of `Recording` this is.
///
/// `Data` recordings contain user-data logged via `log_` API calls.
/// `Blueprint` recordings describe how that data is laid out.
///
/// Both of these types can go over the same stream and be stored in the
/// same datastore, but the viewer wants to treat them very differently.
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub struct RecordingId(Arc<String>);

impl Default for RecordingId {
fn default() -> Self {
Self::unknown()
}
pub enum RecordingType {
Data,
Blueprint,
}

impl RecordingId {
impl std::fmt::Display for RecordingType {
#[inline]
pub fn unknown() -> Self {
"UNKNOWN".into()
}

#[inline]
pub fn random() -> Self {
Self(Arc::new(uuid::Uuid::new_v4().to_string()))
}

#[inline]
pub fn from_uuid(uuid: uuid::Uuid) -> Self {
Self(Arc::new(uuid.to_string()))
}
}

impl std::fmt::Display for RecordingId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}

impl From<&str> for RecordingId {
fn from(s: &str) -> Self {
Self(Arc::new(s.to_owned()))
match self {
Self::Data => "Data".fmt(f),
Self::Blueprint => "Blueprint".fmt(f),
}
}
}

impl From<String> for RecordingId {
fn from(s: String) -> Self {
Self(Arc::new(s))
}
/// A unique id per recording (a stream of [`LogMsg`]es).
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub struct RecordingId {
variant: RecordingType,
id: Arc<String>,
}

impl RecordingId {
#[inline]
pub fn as_str(&self) -> &str {
self.0.as_str()
pub fn random(variant: RecordingType) -> Self {
Self {
variant,
id: Arc::new(uuid::Uuid::new_v4().to_string()),
}
}
}

impl AsRef<str> for RecordingId {
#[inline]
fn as_ref(&self) -> &str {
self.as_str()
pub fn from_uuid(variant: RecordingType, uuid: uuid::Uuid) -> Self {
Self {
variant,
id: Arc::new(uuid.to_string()),
}
}
}

impl std::borrow::Borrow<str> for RecordingId {
#[inline]
fn borrow(&self) -> &str {
self.as_str()
pub fn from_string(variant: RecordingType, str: String) -> Self {
Self {
variant,
id: Arc::new(str),
}
}
}

impl std::ops::Deref for RecordingId {
type Target = str;
impl std::fmt::Display for RecordingId {
#[inline]
fn deref(&self) -> &str {
self.as_str()
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self { variant, id } = self;
f.write_fmt(format_args!("{variant}:{id}"))?;
Ok(())
}
}

Expand Down Expand Up @@ -221,17 +213,17 @@ pub enum LogMsg {
ArrowMsg(RecordingId, ArrowMsg),

/// Sent when the client shuts down the connection.
Goodbye(RowId),
Goodbye(RecordingId, RowId),
}

impl LogMsg {
pub fn recording_id(&self) -> Option<&RecordingId> {
pub fn recording_id(&self) -> &RecordingId {
match self {
Self::BeginRecordingMsg(msg) => Some(&msg.info.recording_id),
Self::BeginRecordingMsg(msg) => &msg.info.recording_id,
Self::EntityPathOpMsg(recording_id, _) | Self::ArrowMsg(recording_id, _) => {
Some(recording_id)
recording_id
}
Self::Goodbye(_) => None,
Self::Goodbye(recording_id, _) => recording_id,
}
}
}
Expand Down
7 changes: 1 addition & 6 deletions crates/re_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ demo = []
## Add support for some math operations using [`glam`](https://crates.io/crates/glam/).
glam = ["re_log_types/glam"]

## Add the `global_session` method.
global_session = ["dep:once_cell"]

## Integration with the [`image`](https://crates.io/crates/image/) crate.
image = ["re_log_types/image"]

Expand All @@ -44,12 +41,10 @@ re_sdk_comms = { workspace = true, features = ["client"] }
ahash.workspace = true
crossbeam.workspace = true
document-features = "0.2"
once_cell = "1.12"
parking_lot.workspace = true
thiserror.workspace = true

# Optional dependencies:
once_cell = { version = "1.12", optional = true }


[dev-dependencies]
arrow2_convert.workspace = true
Expand Down
Loading