Skip to content

Commit

Permalink
Fix crash when using RecordingStream::set_thread_local on macOS (#3929
Browse files Browse the repository at this point in the history
)

### What
* Closes #3558
* Closes #2889
* Opens #3937

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested [demo.rerun.io](https://demo.rerun.io/pr/3929) (if
applicable)
* [x] The PR title and labels are set such as to maximize their
usefulness for the next release's CHANGELOG

- [PR Build Summary](https://build.rerun.io/pr/3929)
- [Docs
preview](https://rerun.io/preview/2b1469294de742102f69f16f6ecb8dc0e3e3a0c5/docs)
<!--DOCS-PREVIEW-->
- [Examples
preview](https://rerun.io/preview/2b1469294de742102f69f16f6ecb8dc0e3e3a0c5/examples)
<!--EXAMPLES-PREVIEW-->
- [Recent benchmark results](https://ref.rerun.io/dev/bench/)
- [Wasm size tracking](https://ref.rerun.io/dev/sizes/)

---------

Co-authored-by: Andreas Reich <[email protected]>
  • Loading branch information
emilk and Wumpf authored Oct 20, 2023
1 parent 31537ef commit a8bb2f1
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 20 deletions.
67 changes: 47 additions & 20 deletions crates/re_sdk/src/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,49 @@ impl std::fmt::Display for RecordingScope {

// ---

/// Required to work-around <https://github.com/rerun-io/rerun/issues/2889>
#[derive(Default)]
struct ThreadLocalRecording {
stream: Option<RecordingStream>,
}

impl ThreadLocalRecording {
fn replace(&mut self, stream: Option<RecordingStream>) -> Option<RecordingStream> {
std::mem::replace(&mut self.stream, stream)
}

fn get(&self) -> Option<RecordingStream> {
self.stream.clone()
}
}

#[cfg(target_os = "macos")]
impl Drop for ThreadLocalRecording {
fn drop(&mut self) {
if let Some(stream) = self.stream.take() {
// Work-around for https://github.com/rerun-io/rerun/issues/2889
// Calling drop on `self.stream` will panic the calling thread.
// But we want to make sure we don't loose the data in the stream.
// So how?
re_log::warn!("Using thread-local RecordingStream on macOS can result in data loss because of https://github.com/rerun-io/rerun/issues/3937");

// Give the batcher and sink threads a chance to process the data.
std::thread::sleep(std::time::Duration::from_millis(500));

#[allow(clippy::mem_forget)] // Intentionally not calling `drop`
std::mem::forget(stream);
}
}
}

static GLOBAL_DATA_RECORDING: OnceCell<RwLock<Option<RecordingStream>>> = OnceCell::new();
thread_local! {
static LOCAL_DATA_RECORDING: RefCell<Option<RecordingStream>> = RefCell::new(None);
static LOCAL_DATA_RECORDING: RefCell<ThreadLocalRecording> = Default::default();
}

static GLOBAL_BLUEPRINT_RECORDING: OnceCell<RwLock<Option<RecordingStream>>> = OnceCell::new();
thread_local! {
static LOCAL_BLUEPRINT_RECORDING: RefCell<Option<RecordingStream>> = RefCell::new(None);
static LOCAL_BLUEPRINT_RECORDING: RefCell<ThreadLocalRecording> = Default::default();
}

/// Check whether we are the child of a fork.
Expand Down Expand Up @@ -187,17 +222,15 @@ impl RecordingStream {
.get_or_init(Default::default)
.read()
.clone(),
RecordingScope::ThreadLocal => {
LOCAL_DATA_RECORDING.with(|rec| rec.borrow().clone())
}
RecordingScope::ThreadLocal => LOCAL_DATA_RECORDING.with(|rec| rec.borrow().get()),
},
StoreKind::Blueprint => match scope {
RecordingScope::Global => GLOBAL_BLUEPRINT_RECORDING
.get_or_init(Default::default)
.read()
.clone(),
RecordingScope::ThreadLocal => {
LOCAL_BLUEPRINT_RECORDING.with(|rec| rec.borrow().clone())
LOCAL_BLUEPRINT_RECORDING.with(|rec| rec.borrow().get())
}
},
}
Expand All @@ -214,10 +247,9 @@ impl RecordingStream {
&mut *GLOBAL_DATA_RECORDING.get_or_init(Default::default).write(),
rec,
),
RecordingScope::ThreadLocal => LOCAL_DATA_RECORDING.with(|cell| {
let mut cell = cell.borrow_mut();
std::mem::replace(&mut *cell, rec)
}),
RecordingScope::ThreadLocal => {
LOCAL_DATA_RECORDING.with(|cell| cell.borrow_mut().replace(rec))
}
},
StoreKind::Blueprint => match scope {
RecordingScope::Global => std::mem::replace(
Expand All @@ -226,10 +258,9 @@ impl RecordingStream {
.write(),
rec,
),
RecordingScope::ThreadLocal => LOCAL_BLUEPRINT_RECORDING.with(|cell| {
let mut cell = cell.borrow_mut();
std::mem::replace(&mut *cell, rec)
}),
RecordingScope::ThreadLocal => {
LOCAL_BLUEPRINT_RECORDING.with(|cell| cell.borrow_mut().replace(rec))
}
},
}
}
Expand All @@ -244,9 +275,7 @@ impl RecordingStream {
}
}
RecordingScope::ThreadLocal => LOCAL_DATA_RECORDING.with(|cell| {
if let Some(cell) = cell.take() {
std::mem::forget(cell);
}
std::mem::forget(cell.take());
}),
},
StoreKind::Blueprint => match scope {
Expand All @@ -256,9 +285,7 @@ impl RecordingStream {
}
}
RecordingScope::ThreadLocal => LOCAL_BLUEPRINT_RECORDING.with(|cell| {
if let Some(cell) = cell.take() {
std::mem::forget(cell);
}
std::mem::forget(cell.take());
}),
},
}
Expand Down
16 changes: 16 additions & 0 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1620,4 +1620,20 @@ mod tests {
// That's all.
assert!(msgs.pop().is_none());
}

#[test]
fn test_set_thread_local() {
// Regression-test for https://github.com/rerun-io/rerun/issues/2889
std::thread::Builder::new()
.name("test_thead".to_owned())
.spawn(|| {
let stream = RecordingStreamBuilder::new("rerun_example_test")
.buffered()
.unwrap();
RecordingStream::set_thread_local(StoreKind::Recording, Some(stream));
})
.unwrap()
.join()
.unwrap();
}
}

0 comments on commit a8bb2f1

Please sign in to comment.