Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed May 11, 2023
1 parent 1ae501b commit d473cd6
Show file tree
Hide file tree
Showing 32 changed files with 1,870 additions and 764 deletions.
7 changes: 1 addition & 6 deletions crates/re_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ demo = []
## Add support for some math operations using [`glam`](https://crates.io/crates/glam/).
glam = ["re_log_types/glam"]

## Add the `global_session` method.
global_session = ["dep:once_cell"]

## Integration with the [`image`](https://crates.io/crates/image/) crate.
image = ["re_log_types/image"]

Expand All @@ -44,12 +41,10 @@ re_sdk_comms = { workspace = true, features = ["client"] }
ahash.workspace = true
crossbeam.workspace = true
document-features = "0.2"
once_cell = "1.12"
parking_lot.workspace = true
thiserror.workspace = true

# Optional dependencies:
once_cell = { version = "1.12", optional = true }


[dev-dependencies]
arrow2_convert.workspace = true
Expand Down
360 changes: 360 additions & 0 deletions crates/re_sdk/src/global.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,360 @@
//! Keeps track of global and thread-local [`RecordingStream`]s and handles fallback logic between
//! them.
use std::cell::RefCell;

use once_cell::sync::OnceCell;
use parking_lot::RwLock;

use crate::RecordingStream;

// ---

// TODO: use Jeremy's
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[allow(missing_docs)]
pub enum RecordingType {
Unknown,
Data,
Blueprint,
}

impl std::fmt::Display for RecordingType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
RecordingType::Unknown => "unknown",
RecordingType::Data => "data",
RecordingType::Blueprint => "blueprint",
})
}
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
enum RecordingScope {
Global,
ThreadLocal,
}

impl std::fmt::Display for RecordingScope {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
RecordingScope::Global => "global",
RecordingScope::ThreadLocal => "thread-local",
})
}
}

// ---

static GLOBAL_DATA_RECORDING: OnceCell<RwLock<Option<RecordingStream>>> = OnceCell::new();
thread_local! {
static LOCAL_DATA_RECORDING: RefCell<Option<RecordingStream>> = RefCell::new(None);
}

static GLOBAL_BLUEPRINT_RECORDING: OnceCell<RwLock<Option<RecordingStream>>> = OnceCell::new();
thread_local! {
static LOCAL_BLUEPRINT_RECORDING: RefCell<Option<RecordingStream>> = RefCell::new(None);
}

impl RecordingStream {
/// Returns `overrides` if it exists, otherwise returns the most appropriate active recording
/// of the specified type (i.e. thread-local first, then global scope), if any.
#[inline]
pub fn get(
which: RecordingType,
overrides: Option<RecordingStream>,
) -> Option<RecordingStream> {
let rec = overrides.or_else(|| {
Self::get_any(RecordingScope::ThreadLocal, which)
.or_else(|| Self::get_any(RecordingScope::Global, which))
});

if rec.is_none() {
// NOTE: This is the one and only place where a warning about missing active recording
// should be printed, don't stutter!
re_log::warn_once!(
"There is no currently active {which} recording available \
for the current thread ({:?}): have you called `set_global()` and/or \
`set_thread_local()` first?",
std::thread::current().id(),
);
}

rec
}

// --- Global ---

/// Returns the currently active recording of the specified type in the global scope, if any.
#[inline]
pub fn global(which: RecordingType) -> Option<RecordingStream> {
Self::get_any(RecordingScope::Global, which)
}

/// Replaces the currently active recording of the specified type in the global scope with
/// the specified one.
///
/// Returns the previous one, if any.
#[inline]
pub fn set_global(
which: RecordingType,
rec: Option<RecordingStream>,
) -> Option<RecordingStream> {
Self::set(RecordingScope::Global, which, rec)
}

// --- Thread local ---

/// Returns the currently active recording of the specified type in the thread-local scope,
/// if any.
#[inline]
pub fn thread_local(which: RecordingType) -> Option<RecordingStream> {
Self::get_any(RecordingScope::ThreadLocal, which)
}

/// Replaces the currently active recording of the specified type in the thread-local scope
/// with the specified one.
#[inline]
pub fn set_thread_local(
which: RecordingType,
rec: Option<RecordingStream>,
) -> Option<RecordingStream> {
Self::set(RecordingScope::ThreadLocal, which, rec)
}

// --- Internal helpers ---

fn get_any(scope: RecordingScope, which: RecordingType) -> Option<RecordingStream> {
match which {
RecordingType::Unknown => None,
RecordingType::Data => match scope {
RecordingScope::Global => GLOBAL_DATA_RECORDING
.get_or_init(Default::default)
.read()
.clone(),
RecordingScope::ThreadLocal => {
LOCAL_DATA_RECORDING.with(|rec| rec.borrow().clone())
}
},
RecordingType::Blueprint => match scope {
RecordingScope::Global => GLOBAL_BLUEPRINT_RECORDING
.get_or_init(Default::default)
.read()
.clone(),
RecordingScope::ThreadLocal => {
LOCAL_BLUEPRINT_RECORDING.with(|rec| rec.borrow().clone())
}
},
}
}

fn set(
scope: RecordingScope,
which: RecordingType,
rec: Option<RecordingStream>,
) -> Option<RecordingStream> {
match which {
RecordingType::Unknown => None,
RecordingType::Data => match scope {
RecordingScope::Global => std::mem::replace(
&mut *GLOBAL_DATA_RECORDING.get_or_init(Default::default).write(),
rec,
),
RecordingScope::ThreadLocal => LOCAL_DATA_RECORDING.with(|cell| {
let mut cell = cell.borrow_mut();
std::mem::replace(&mut *cell, rec)
}),
},
RecordingType::Blueprint => match scope {
RecordingScope::Global => std::mem::replace(
&mut *GLOBAL_BLUEPRINT_RECORDING
.get_or_init(Default::default)
.write(),
rec,
),
RecordingScope::ThreadLocal => LOCAL_BLUEPRINT_RECORDING.with(|cell| {
let mut cell = cell.borrow_mut();
std::mem::replace(&mut *cell, rec)
}),
},
}
}
}

// ---

#[cfg(test)]
mod tests {
use crate::RecordingStreamBuilder;

use super::*;

#[test]
fn fallbacks() {
fn check_recording_id(expected: &RecordingStream, got: Option<RecordingStream>) {
assert_eq!(
expected.recording_info().unwrap().recording_id,
got.unwrap().recording_info().unwrap().recording_id
);
}

// nothing is set
assert!(RecordingStream::get(RecordingType::Data, None).is_none());
assert!(RecordingStream::get(RecordingType::Blueprint, None).is_none());
assert!(RecordingStream::get(RecordingType::Unknown, None).is_none());

// nothing is set -- explicit wins
let explicit = RecordingStreamBuilder::new("explicit").buffered().unwrap();
check_recording_id(
&explicit,
RecordingStream::get(RecordingType::Data, explicit.clone().into()),
);
check_recording_id(
&explicit,
RecordingStream::get(RecordingType::Blueprint, explicit.clone().into()),
);
check_recording_id(
&explicit,
RecordingStream::get(RecordingType::Unknown, explicit.clone().into()),
);

let global_data = RecordingStreamBuilder::new("global_data")
.buffered()
.unwrap();
assert!(
RecordingStream::set_global(RecordingType::Data, Some(global_data.clone())).is_none()
);

let global_blueprint = RecordingStreamBuilder::new("global_blueprint")
.buffered()
.unwrap();
assert!(RecordingStream::set_global(
RecordingType::Blueprint,
Some(global_blueprint.clone())
)
.is_none());

// globals are set, no explicit -- globals win
check_recording_id(
&global_data,
RecordingStream::get(RecordingType::Data, None),
);
check_recording_id(
&global_blueprint,
RecordingStream::get(RecordingType::Blueprint, None),
);

// overwrite globals with themselves -- we expect to get the same value back
check_recording_id(
&global_data,
RecordingStream::set_global(RecordingType::Data, Some(global_data.clone())),
);
check_recording_id(
&global_blueprint,
RecordingStream::set_global(RecordingType::Blueprint, Some(global_blueprint.clone())),
);

std::thread::Builder::new()
.spawn({
let global_data = global_data.clone();
let global_blueprint = global_blueprint.clone();
move || {
// globals are still set, no explicit -- globals still win
check_recording_id(
&global_data,
RecordingStream::get(RecordingType::Data, None),
);
check_recording_id(
&global_blueprint,
RecordingStream::get(RecordingType::Blueprint, None),
);

let local_data = RecordingStreamBuilder::new("local_data")
.buffered()
.unwrap();
assert!(RecordingStream::set_thread_local(
RecordingType::Data,
Some(local_data.clone())
)
.is_none());

let local_blueprint = RecordingStreamBuilder::new("local_blueprint")
.buffered()
.unwrap();
assert!(RecordingStream::set_thread_local(
RecordingType::Blueprint,
Some(local_blueprint.clone())
)
.is_none());

// locals are set for this thread -- locals win
check_recording_id(
&local_data,
RecordingStream::get(RecordingType::Data, None),
);
check_recording_id(
&local_blueprint,
RecordingStream::get(RecordingType::Blueprint, None),
);

// explicit still outsmarts everyone no matter what
check_recording_id(
&explicit,
RecordingStream::get(RecordingType::Data, explicit.clone().into()),
);
check_recording_id(
&explicit,
RecordingStream::get(RecordingType::Blueprint, explicit.clone().into()),
);
}
})
.unwrap()
.join()
.unwrap();

// locals should not exist in this thread -- global wins
check_recording_id(
&global_data,
RecordingStream::get(RecordingType::Data, None),
);
check_recording_id(
&global_blueprint,
RecordingStream::get(RecordingType::Blueprint, None),
);

let local_data = RecordingStreamBuilder::new("local_data")
.buffered()
.unwrap();
assert!(
RecordingStream::set_thread_local(RecordingType::Data, Some(local_data.clone()))
.is_none()
);

let local_blueprint = RecordingStreamBuilder::new("local_blueprint")
.buffered()
.unwrap();
assert!(RecordingStream::set_thread_local(
RecordingType::Blueprint,
Some(local_blueprint.clone())
)
.is_none());

check_recording_id(
&global_data,
RecordingStream::set_global(RecordingType::Data, None),
);
check_recording_id(
&global_blueprint,
RecordingStream::set_global(RecordingType::Blueprint, None),
);

// locals still win
check_recording_id(&local_data, RecordingStream::get(RecordingType::Data, None));
check_recording_id(
&local_blueprint,
RecordingStream::get(RecordingType::Blueprint, None),
);
}
}
Loading

0 comments on commit d473cd6

Please sign in to comment.