Skip to content

Commit

Permalink
Python SDK: multi-recording & multi-threading redesign (#2061)
Browse files Browse the repository at this point in the history
* implement multi-recording support via RecordingStream

* please get me out of here

* random fixes

* no need to flush manually for notebooks anymore

* jupyter deadlock from hell

* tornado >6.1 doesn't work with recent jupyters

* Make every RecordingId typed and preclude the existence of 'Defaults'

* docstrings

* addressing minor PR comments

* RecordingStream in charge of Goodbye payloads

* fmt

* addressed PR comments

* say goodbye when gracefully disconnecting

* maintain status quo regarding opened recordings

* init shall not return a recording

* tryin to document stuff

* directly expose batcher stuff in the sdk

* lints

* bruh

* more doc stuff i guess

---------

Co-authored-by: Jeremy Leibs <[email protected]>
  • Loading branch information
teh-cmc and jleibs authored May 15, 2023
1 parent 8f27fb8 commit ad4c38c
Show file tree
Hide file tree
Showing 33 changed files with 1,975 additions and 771 deletions.
7 changes: 1 addition & 6 deletions crates/re_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ demo = []
## Add support for some math operations using [`glam`](https://crates.io/crates/glam/).
glam = ["re_log_types/glam"]

## Add the `global_session` method.
global_session = ["dep:once_cell"]

## Integration with the [`image`](https://crates.io/crates/image/) crate.
image = ["re_log_types/image"]

Expand All @@ -44,12 +41,10 @@ re_sdk_comms = { workspace = true, features = ["client"] }
ahash.workspace = true
crossbeam.workspace = true
document-features = "0.2"
once_cell = "1.12"
parking_lot.workspace = true
thiserror.workspace = true

# Optional dependencies:
once_cell = { version = "1.12", optional = true }


[dev-dependencies]
arrow2_convert.workspace = true
Expand Down
333 changes: 333 additions & 0 deletions crates/re_sdk/src/global.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,333 @@
//! Keeps track of global and thread-local [`RecordingStream`]s and handles fallback logic between
//! them.
use std::cell::RefCell;

use once_cell::sync::OnceCell;
use parking_lot::RwLock;

use crate::{RecordingStream, RecordingType};

// ---

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
enum RecordingScope {
Global,
ThreadLocal,
}

impl std::fmt::Display for RecordingScope {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
RecordingScope::Global => "global",
RecordingScope::ThreadLocal => "thread-local",
})
}
}

// ---

static GLOBAL_DATA_RECORDING: OnceCell<RwLock<Option<RecordingStream>>> = OnceCell::new();
thread_local! {
static LOCAL_DATA_RECORDING: RefCell<Option<RecordingStream>> = RefCell::new(None);
}

static GLOBAL_BLUEPRINT_RECORDING: OnceCell<RwLock<Option<RecordingStream>>> = OnceCell::new();
thread_local! {
static LOCAL_BLUEPRINT_RECORDING: RefCell<Option<RecordingStream>> = RefCell::new(None);
}

impl RecordingStream {
/// Returns `overrides` if it exists, otherwise returns the most appropriate active recording
/// of the specified type (i.e. thread-local first, then global scope), if any.
#[inline]
pub fn get(
which: RecordingType,
overrides: Option<RecordingStream>,
) -> Option<RecordingStream> {
let rec = overrides.or_else(|| {
Self::get_any(RecordingScope::ThreadLocal, which)
.or_else(|| Self::get_any(RecordingScope::Global, which))
});

if rec.is_none() {
// NOTE: This is the one and only place where a warning about missing active recording
// should be printed, don't stutter!
re_log::warn_once!(
"There is no currently active {which} recording available \
for the current thread ({:?}): have you called `set_global()` and/or \
`set_thread_local()` first?",
std::thread::current().id(),
);
}

rec
}

// --- Global ---

/// Returns the currently active recording of the specified type in the global scope, if any.
#[inline]
pub fn global(which: RecordingType) -> Option<RecordingStream> {
Self::get_any(RecordingScope::Global, which)
}

/// Replaces the currently active recording of the specified type in the global scope with
/// the specified one.
///
/// Returns the previous one, if any.
#[inline]
pub fn set_global(
which: RecordingType,
rec: Option<RecordingStream>,
) -> Option<RecordingStream> {
Self::set_any(RecordingScope::Global, which, rec)
}

// --- Thread local ---

/// Returns the currently active recording of the specified type in the thread-local scope,
/// if any.
#[inline]
pub fn thread_local(which: RecordingType) -> Option<RecordingStream> {
Self::get_any(RecordingScope::ThreadLocal, which)
}

/// Replaces the currently active recording of the specified type in the thread-local scope
/// with the specified one.
#[inline]
pub fn set_thread_local(
which: RecordingType,
rec: Option<RecordingStream>,
) -> Option<RecordingStream> {
Self::set_any(RecordingScope::ThreadLocal, which, rec)
}

// --- Internal helpers ---

fn get_any(scope: RecordingScope, which: RecordingType) -> Option<RecordingStream> {
match which {
RecordingType::Data => match scope {
RecordingScope::Global => GLOBAL_DATA_RECORDING
.get_or_init(Default::default)
.read()
.clone(),
RecordingScope::ThreadLocal => {
LOCAL_DATA_RECORDING.with(|rec| rec.borrow().clone())
}
},
RecordingType::Blueprint => match scope {
RecordingScope::Global => GLOBAL_BLUEPRINT_RECORDING
.get_or_init(Default::default)
.read()
.clone(),
RecordingScope::ThreadLocal => {
LOCAL_BLUEPRINT_RECORDING.with(|rec| rec.borrow().clone())
}
},
}
}

fn set_any(
scope: RecordingScope,
which: RecordingType,
rec: Option<RecordingStream>,
) -> Option<RecordingStream> {
match which {
RecordingType::Data => match scope {
RecordingScope::Global => std::mem::replace(
&mut *GLOBAL_DATA_RECORDING.get_or_init(Default::default).write(),
rec,
),
RecordingScope::ThreadLocal => LOCAL_DATA_RECORDING.with(|cell| {
let mut cell = cell.borrow_mut();
std::mem::replace(&mut *cell, rec)
}),
},
RecordingType::Blueprint => match scope {
RecordingScope::Global => std::mem::replace(
&mut *GLOBAL_BLUEPRINT_RECORDING
.get_or_init(Default::default)
.write(),
rec,
),
RecordingScope::ThreadLocal => LOCAL_BLUEPRINT_RECORDING.with(|cell| {
let mut cell = cell.borrow_mut();
std::mem::replace(&mut *cell, rec)
}),
},
}
}
}

// ---

#[cfg(test)]
mod tests {
use crate::RecordingStreamBuilder;

use super::*;

#[test]
fn fallbacks() {
fn check_recording_id(expected: &RecordingStream, got: Option<RecordingStream>) {
assert_eq!(
expected.recording_info().unwrap().recording_id,
got.unwrap().recording_info().unwrap().recording_id
);
}

// nothing is set
assert!(RecordingStream::get(RecordingType::Data, None).is_none());
assert!(RecordingStream::get(RecordingType::Blueprint, None).is_none());

// nothing is set -- explicit wins
let explicit = RecordingStreamBuilder::new("explicit").buffered().unwrap();
check_recording_id(
&explicit,
RecordingStream::get(RecordingType::Data, explicit.clone().into()),
);
check_recording_id(
&explicit,
RecordingStream::get(RecordingType::Blueprint, explicit.clone().into()),
);

let global_data = RecordingStreamBuilder::new("global_data")
.buffered()
.unwrap();
assert!(
RecordingStream::set_global(RecordingType::Data, Some(global_data.clone())).is_none()
);

let global_blueprint = RecordingStreamBuilder::new("global_blueprint")
.buffered()
.unwrap();
assert!(RecordingStream::set_global(
RecordingType::Blueprint,
Some(global_blueprint.clone())
)
.is_none());

// globals are set, no explicit -- globals win
check_recording_id(
&global_data,
RecordingStream::get(RecordingType::Data, None),
);
check_recording_id(
&global_blueprint,
RecordingStream::get(RecordingType::Blueprint, None),
);

// overwrite globals with themselves -- we expect to get the same value back
check_recording_id(
&global_data,
RecordingStream::set_global(RecordingType::Data, Some(global_data.clone())),
);
check_recording_id(
&global_blueprint,
RecordingStream::set_global(RecordingType::Blueprint, Some(global_blueprint.clone())),
);

std::thread::Builder::new()
.spawn({
let global_data = global_data.clone();
let global_blueprint = global_blueprint.clone();
move || {
// globals are still set, no explicit -- globals still win
check_recording_id(
&global_data,
RecordingStream::get(RecordingType::Data, None),
);
check_recording_id(
&global_blueprint,
RecordingStream::get(RecordingType::Blueprint, None),
);

let local_data = RecordingStreamBuilder::new("local_data")
.buffered()
.unwrap();
assert!(RecordingStream::set_thread_local(
RecordingType::Data,
Some(local_data.clone())
)
.is_none());

let local_blueprint = RecordingStreamBuilder::new("local_blueprint")
.buffered()
.unwrap();
assert!(RecordingStream::set_thread_local(
RecordingType::Blueprint,
Some(local_blueprint.clone())
)
.is_none());

// locals are set for this thread -- locals win
check_recording_id(
&local_data,
RecordingStream::get(RecordingType::Data, None),
);
check_recording_id(
&local_blueprint,
RecordingStream::get(RecordingType::Blueprint, None),
);

// explicit still outsmarts everyone no matter what
check_recording_id(
&explicit,
RecordingStream::get(RecordingType::Data, explicit.clone().into()),
);
check_recording_id(
&explicit,
RecordingStream::get(RecordingType::Blueprint, explicit.clone().into()),
);
}
})
.unwrap()
.join()
.unwrap();

// locals should not exist in this thread -- global wins
check_recording_id(
&global_data,
RecordingStream::get(RecordingType::Data, None),
);
check_recording_id(
&global_blueprint,
RecordingStream::get(RecordingType::Blueprint, None),
);

let local_data = RecordingStreamBuilder::new("local_data")
.buffered()
.unwrap();
assert!(
RecordingStream::set_thread_local(RecordingType::Data, Some(local_data.clone()))
.is_none()
);

let local_blueprint = RecordingStreamBuilder::new("local_blueprint")
.buffered()
.unwrap();
assert!(RecordingStream::set_thread_local(
RecordingType::Blueprint,
Some(local_blueprint.clone())
)
.is_none());

check_recording_id(
&global_data,
RecordingStream::set_global(RecordingType::Data, None),
);
check_recording_id(
&global_blueprint,
RecordingStream::set_global(RecordingType::Blueprint, None),
);

// locals still win
check_recording_id(&local_data, RecordingStream::get(RecordingType::Data, None));
check_recording_id(
&local_blueprint,
RecordingStream::get(RecordingType::Blueprint, None),
);
}
}
10 changes: 7 additions & 3 deletions crates/re_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// ----------------
// Private modules:

mod global;
mod log_sink;
mod msg_sender;
mod recording_stream;
Expand All @@ -19,11 +20,11 @@ mod recording_stream;
pub use self::msg_sender::{MsgSender, MsgSenderError};
pub use self::recording_stream::{RecordingStream, RecordingStreamBuilder};

use re_log_types::RecordingType;
pub use re_sdk_comms::default_server_addr;

pub use re_log_types::{
ApplicationId, Component, ComponentName, EntityPath, RecordingId, SerializableComponent,
ApplicationId, Component, ComponentName, EntityPath, RecordingId, RecordingType,
SerializableComponent,
};

#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -55,7 +56,10 @@ pub mod sink {

/// Things directly related to logging.
pub mod log {
pub use re_log_types::{DataCell, DataRow, DataTable, LogMsg, PathOp, RowId, TableId};
pub use re_log_types::{
DataCell, DataRow, DataTable, DataTableBatcher, DataTableBatcherConfig, LogMsg, PathOp,
RowId, TableId,
};
}

/// Time-related types.
Expand Down
Loading

1 comment on commit ad4c38c

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'Rust Benchmark'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 1.25.

Benchmark suite Current: ad4c38c Previous: 8f27fb8 Ratio
mono_points_arrow_batched/generate_message_bundles 28778309 ns/iter (± 1132085) 18998675 ns/iter (± 804943) 1.51
mono_points_arrow_batched/generate_messages 5777618 ns/iter (± 1028371) 4038161 ns/iter (± 144447) 1.43
mono_points_arrow_batched/encode_total 36619529 ns/iter (± 1755489) 25060410 ns/iter (± 1250525) 1.46

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.