Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python SDK: multi-recording & multi-threading redesign #2061

Merged
merged 26 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4357474
implement multi-recording support via RecordingStream
teh-cmc May 11, 2023
e3e9a91
please get me out of here
teh-cmc May 11, 2023
6f21c50
random fixes
teh-cmc May 11, 2023
c8c9d61
no need to flush manually for notebooks anymore
teh-cmc May 11, 2023
7b737e6
jupyter deadlock from hell
teh-cmc May 11, 2023
1448c84
tornado >6.1 doesn't work with recent jupyters
teh-cmc May 11, 2023
397a5b9
Merge remote-tracking branch 'origin/main' into cmc/py/recordingstrea…
teh-cmc May 12, 2023
b14f2ba
Make every RecordingId typed and preclude the existence of 'Defaults'
jleibs May 12, 2023
ab9eb67
docstrings
jleibs May 12, 2023
68ea69a
Merge branch 'main' into cmc/py/recordingstream_exposed
teh-cmc May 15, 2023
6cff3d3
Merge branch 'main' into jleibs/explicit_recording_ids
teh-cmc May 15, 2023
8b9f4f0
addressing minor PR comments
teh-cmc May 15, 2023
cf457c8
RecordingStream in charge of Goodbye payloads
teh-cmc May 15, 2023
7280db7
fmt
teh-cmc May 15, 2023
0a9af05
Merge remote-tracking branch 'origin/jleibs/explicit_recording_ids' i…
teh-cmc May 15, 2023
e060b77
addressed PR comments
teh-cmc May 15, 2023
35782c5
say goodbye when gracefully disconnecting
teh-cmc May 15, 2023
5bacc1e
Merge branch 'jleibs/explicit_recording_ids' into cmc/py/recordingstr…
teh-cmc May 15, 2023
d3afddb
maintain status quo regarding opened recordings
teh-cmc May 15, 2023
59f79cc
init shall not return a recording
teh-cmc May 15, 2023
7f62883
tryin to document stuff
teh-cmc May 15, 2023
18a7403
directly expose batcher stuff in the sdk
teh-cmc May 15, 2023
58dd756
lints
teh-cmc May 15, 2023
ff4bcd7
bruh
teh-cmc May 15, 2023
f9383e6
more doc stuff i guess
teh-cmc May 15, 2023
56049ca
Merge remote-tracking branch 'origin/main' into cmc/py/recordingstrea…
teh-cmc May 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions crates/re_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ demo = []
## Add support for some math operations using [`glam`](https://crates.io/crates/glam/).
glam = ["re_log_types/glam"]

## Add the `global_session` method.
global_session = ["dep:once_cell"]

## Integration with the [`image`](https://crates.io/crates/image/) crate.
image = ["re_log_types/image"]

Expand All @@ -44,12 +41,10 @@ re_sdk_comms = { workspace = true, features = ["client"] }
ahash.workspace = true
crossbeam.workspace = true
document-features = "0.2"
once_cell = "1.12"
parking_lot.workspace = true
thiserror.workspace = true

# Optional dependencies:
once_cell = { version = "1.12", optional = true }


[dev-dependencies]
arrow2_convert.workspace = true
Expand Down
360 changes: 360 additions & 0 deletions crates/re_sdk/src/global.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,360 @@
//! Keeps track of global and thread-local [`RecordingStream`]s and handles fallback logic between
//! them.

use std::cell::RefCell;

use once_cell::sync::OnceCell;
use parking_lot::RwLock;

use crate::RecordingStream;

// ---

// TODO: use Jeremy's
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[allow(missing_docs)]
pub enum RecordingType {
Unknown,
Data,
Blueprint,
}

impl std::fmt::Display for RecordingType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
RecordingType::Unknown => "unknown",
RecordingType::Data => "data",
RecordingType::Blueprint => "blueprint",
})
}
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
enum RecordingScope {
Global,
ThreadLocal,
}

impl std::fmt::Display for RecordingScope {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
RecordingScope::Global => "global",
RecordingScope::ThreadLocal => "thread-local",
})
}
}

// ---

static GLOBAL_DATA_RECORDING: OnceCell<RwLock<Option<RecordingStream>>> = OnceCell::new();
thread_local! {
static LOCAL_DATA_RECORDING: RefCell<Option<RecordingStream>> = RefCell::new(None);
}

static GLOBAL_BLUEPRINT_RECORDING: OnceCell<RwLock<Option<RecordingStream>>> = OnceCell::new();
thread_local! {
static LOCAL_BLUEPRINT_RECORDING: RefCell<Option<RecordingStream>> = RefCell::new(None);
}

impl RecordingStream {
/// Returns `overrides` if it exists, otherwise returns the most appropriate active recording
/// of the specified type (i.e. thread-local first, then global scope), if any.
#[inline]
pub fn get(
which: RecordingType,
overrides: Option<RecordingStream>,
) -> Option<RecordingStream> {
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
let rec = overrides.or_else(|| {
Self::get_any(RecordingScope::ThreadLocal, which)
.or_else(|| Self::get_any(RecordingScope::Global, which))
});

if rec.is_none() {
// NOTE: This is the one and only place where a warning about missing active recording
// should be printed, don't stutter!
re_log::warn_once!(
"There is no currently active {which} recording available \
for the current thread ({:?}): have you called `set_global()` and/or \
`set_thread_local()` first?",
std::thread::current().id(),
);
}

rec
}

// --- Global ---

/// Returns the currently active recording of the specified type in the global scope, if any.
#[inline]
pub fn global(which: RecordingType) -> Option<RecordingStream> {
Self::get_any(RecordingScope::Global, which)
}

/// Replaces the currently active recording of the specified type in the global scope with
/// the specified one.
///
/// Returns the previous one, if any.
#[inline]
pub fn set_global(
which: RecordingType,
rec: Option<RecordingStream>,
) -> Option<RecordingStream> {
Self::set(RecordingScope::Global, which, rec)
}

// --- Thread local ---

/// Returns the currently active recording of the specified type in the thread-local scope,
/// if any.
#[inline]
pub fn thread_local(which: RecordingType) -> Option<RecordingStream> {
Self::get_any(RecordingScope::ThreadLocal, which)
}

/// Replaces the currently active recording of the specified type in the thread-local scope
/// with the specified one.
#[inline]
pub fn set_thread_local(
which: RecordingType,
rec: Option<RecordingStream>,
) -> Option<RecordingStream> {
Self::set(RecordingScope::ThreadLocal, which, rec)
}

// --- Internal helpers ---

fn get_any(scope: RecordingScope, which: RecordingType) -> Option<RecordingStream> {
match which {
RecordingType::Unknown => None,
RecordingType::Data => match scope {
RecordingScope::Global => GLOBAL_DATA_RECORDING
.get_or_init(Default::default)
.read()
.clone(),
RecordingScope::ThreadLocal => {
LOCAL_DATA_RECORDING.with(|rec| rec.borrow().clone())
}
},
RecordingType::Blueprint => match scope {
RecordingScope::Global => GLOBAL_BLUEPRINT_RECORDING
.get_or_init(Default::default)
.read()
.clone(),
RecordingScope::ThreadLocal => {
LOCAL_BLUEPRINT_RECORDING.with(|rec| rec.borrow().clone())
}
},
}
}

fn set(
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
scope: RecordingScope,
which: RecordingType,
rec: Option<RecordingStream>,
) -> Option<RecordingStream> {
match which {
RecordingType::Unknown => None,
RecordingType::Data => match scope {
RecordingScope::Global => std::mem::replace(
&mut *GLOBAL_DATA_RECORDING.get_or_init(Default::default).write(),
rec,
),
RecordingScope::ThreadLocal => LOCAL_DATA_RECORDING.with(|cell| {
let mut cell = cell.borrow_mut();
std::mem::replace(&mut *cell, rec)
}),
},
RecordingType::Blueprint => match scope {
RecordingScope::Global => std::mem::replace(
&mut *GLOBAL_BLUEPRINT_RECORDING
.get_or_init(Default::default)
.write(),
rec,
),
RecordingScope::ThreadLocal => LOCAL_BLUEPRINT_RECORDING.with(|cell| {
let mut cell = cell.borrow_mut();
std::mem::replace(&mut *cell, rec)
}),
},
}
}
}
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved

// ---

#[cfg(test)]
mod tests {
use crate::RecordingStreamBuilder;

use super::*;

#[test]
fn fallbacks() {
fn check_recording_id(expected: &RecordingStream, got: Option<RecordingStream>) {
assert_eq!(
expected.recording_info().unwrap().recording_id,
got.unwrap().recording_info().unwrap().recording_id
);
}

// nothing is set
assert!(RecordingStream::get(RecordingType::Data, None).is_none());
assert!(RecordingStream::get(RecordingType::Blueprint, None).is_none());
assert!(RecordingStream::get(RecordingType::Unknown, None).is_none());

// nothing is set -- explicit wins
let explicit = RecordingStreamBuilder::new("explicit").buffered().unwrap();
check_recording_id(
&explicit,
RecordingStream::get(RecordingType::Data, explicit.clone().into()),
);
check_recording_id(
&explicit,
RecordingStream::get(RecordingType::Blueprint, explicit.clone().into()),
);
check_recording_id(
&explicit,
RecordingStream::get(RecordingType::Unknown, explicit.clone().into()),
);

let global_data = RecordingStreamBuilder::new("global_data")
.buffered()
.unwrap();
assert!(
RecordingStream::set_global(RecordingType::Data, Some(global_data.clone())).is_none()
);

let global_blueprint = RecordingStreamBuilder::new("global_blueprint")
.buffered()
.unwrap();
assert!(RecordingStream::set_global(
RecordingType::Blueprint,
Some(global_blueprint.clone())
)
.is_none());

// globals are set, no explicit -- globals win
check_recording_id(
&global_data,
RecordingStream::get(RecordingType::Data, None),
);
check_recording_id(
&global_blueprint,
RecordingStream::get(RecordingType::Blueprint, None),
);

// overwrite globals with themselves -- we expect to get the same value back
check_recording_id(
&global_data,
RecordingStream::set_global(RecordingType::Data, Some(global_data.clone())),
);
check_recording_id(
&global_blueprint,
RecordingStream::set_global(RecordingType::Blueprint, Some(global_blueprint.clone())),
);

std::thread::Builder::new()
.spawn({
let global_data = global_data.clone();
let global_blueprint = global_blueprint.clone();
move || {
// globals are still set, no explicit -- globals still win
check_recording_id(
&global_data,
RecordingStream::get(RecordingType::Data, None),
);
check_recording_id(
&global_blueprint,
RecordingStream::get(RecordingType::Blueprint, None),
);

let local_data = RecordingStreamBuilder::new("local_data")
.buffered()
.unwrap();
assert!(RecordingStream::set_thread_local(
RecordingType::Data,
Some(local_data.clone())
)
.is_none());

let local_blueprint = RecordingStreamBuilder::new("local_blueprint")
.buffered()
.unwrap();
assert!(RecordingStream::set_thread_local(
RecordingType::Blueprint,
Some(local_blueprint.clone())
)
.is_none());

// locals are set for this thread -- locals win
check_recording_id(
&local_data,
RecordingStream::get(RecordingType::Data, None),
);
check_recording_id(
&local_blueprint,
RecordingStream::get(RecordingType::Blueprint, None),
);

// explicit still outsmarts everyone no matter what
check_recording_id(
&explicit,
RecordingStream::get(RecordingType::Data, explicit.clone().into()),
);
check_recording_id(
&explicit,
RecordingStream::get(RecordingType::Blueprint, explicit.clone().into()),
);
}
})
.unwrap()
.join()
.unwrap();

// locals should not exist in this thread -- global wins
check_recording_id(
&global_data,
RecordingStream::get(RecordingType::Data, None),
);
check_recording_id(
&global_blueprint,
RecordingStream::get(RecordingType::Blueprint, None),
);

let local_data = RecordingStreamBuilder::new("local_data")
.buffered()
.unwrap();
assert!(
RecordingStream::set_thread_local(RecordingType::Data, Some(local_data.clone()))
.is_none()
);

let local_blueprint = RecordingStreamBuilder::new("local_blueprint")
.buffered()
.unwrap();
assert!(RecordingStream::set_thread_local(
RecordingType::Blueprint,
Some(local_blueprint.clone())
)
.is_none());

check_recording_id(
&global_data,
RecordingStream::set_global(RecordingType::Data, None),
);
check_recording_id(
&global_blueprint,
RecordingStream::set_global(RecordingType::Blueprint, None),
);

// locals still win
check_recording_id(&local_data, RecordingStream::get(RecordingType::Data, None));
check_recording_id(
&local_blueprint,
RecordingStream::get(RecordingType::Blueprint, None),
);
}
}
Loading