Skip to content

Commit

Permalink
Yes, we still need the ALL_RECORDINGS business
Browse files Browse the repository at this point in the history
This reverts commit 29beb4b.
  • Loading branch information
teh-cmc committed Dec 22, 2023
1 parent d47a124 commit eaf8383
Showing 1 changed file with 40 additions and 3 deletions.
43 changes: 40 additions & 3 deletions rerun_py/src/python_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#![allow(clippy::borrow_deref_ref)] // False positive due to #[pufunction] macro
#![allow(unsafe_op_in_unsafe_fn)] // False positive due to #[pufunction] macro

use std::collections::HashMap;
use std::path::PathBuf;

use itertools::Itertools;
Expand Down Expand Up @@ -35,7 +36,19 @@ use re_ws_comms::RerunServerPort;

// --- FFI ---

use once_cell::sync::Lazy;
use once_cell::sync::{Lazy, OnceCell};

// The bridge needs to have complete control over the lifetimes of the individual recordings,
// otherwise all the recording shutdown machinery (which includes deallocating C, Rust and Python
// data and joining a bunch of threads) can end up running at any time depending on what the
// Python GC is doing, which obviously leads to very bad things :tm:.
//
// TODO(#2116): drop unused recordings
fn all_recordings() -> parking_lot::MutexGuard<'static, HashMap<StoreId, RecordingStream>> {
static ALL_RECORDINGS: OnceCell<parking_lot::Mutex<HashMap<StoreId, RecordingStream>>> =
OnceCell::new();
ALL_RECORDINGS.get_or_init(Default::default).lock()
}

type GarbageChunk = arrow2::chunk::Chunk<Box<dyn arrow2::array::Array>>;
type GarbageSender = crossbeam::channel::Sender<GarbageChunk>;
Expand Down Expand Up @@ -82,7 +95,6 @@ fn flush_garbage_queue() {
#[cfg(feature = "web_viewer")]
fn global_web_viewer_server(
) -> parking_lot::MutexGuard<'static, Option<re_web_viewer_server::WebViewerServerHandle>> {
use once_cell::sync::OnceCell;
static WEB_HANDLE: OnceCell<
parking_lot::Mutex<Option<re_web_viewer_server::WebViewerServerHandle>>,
> = OnceCell::new();
Expand Down Expand Up @@ -263,6 +275,9 @@ fn new_recording(
);
}

// NOTE: The Rust-side of the bindings must be in control of the lifetimes of the recordings!
all_recordings().insert(recording_id, recording.clone());

Ok(PyRecordingStream(recording))
}

Expand Down Expand Up @@ -316,12 +331,22 @@ fn new_blueprint(
);
}

// NOTE: The Rust-side of the bindings must be in control of the lifetimes of the recordings!
all_recordings().insert(blueprint_id, blueprint.clone());

Ok(PyRecordingStream(blueprint))
}

#[pyfunction]
fn shutdown() {
fn shutdown(py: Python<'_>) {
re_log::debug!("Shutting down the Rerun SDK");
// Release the GIL in case any flushing behavior needs to cleanup a python object.
py.allow_threads(|| {
for (_, recording) in all_recordings().drain() {
recording.disconnect();
}
flush_garbage_queue();
});
}

// --- Recordings ---
Expand Down Expand Up @@ -388,6 +413,9 @@ fn set_global_data_recording(
// to zero, which means dropping it, which means flushing it, which potentially means
// deallocating python-owned data, which means grabbing the GIL, thus we need to release the
// GIL first.
//
// NOTE: This cannot happen anymore with the new `ALL_RECORDINGS` thingy, but better safe than
// sorry.
py.allow_threads(|| {
let rec = RecordingStream::set_global(
rerun::StoreKind::Recording,
Expand Down Expand Up @@ -417,6 +445,9 @@ fn set_thread_local_data_recording(
// to zero, which means dropping it, which means flushing it, which potentially means
// deallocating python-owned data, which means grabbing the GIL, thus we need to release the
// GIL first.
//
// NOTE: This cannot happen anymore with the new `ALL_RECORDINGS` thingy, but better safe than
// sorry.
py.allow_threads(|| {
let rec = RecordingStream::set_thread_local(
rerun::StoreKind::Recording,
Expand Down Expand Up @@ -457,6 +488,9 @@ fn set_global_blueprint_recording(
// to zero, which means dropping it, which means flushing it, which potentially means
// deallocating python-owned blueprint, which means grabbing the GIL, thus we need to release the
// GIL first.
//
// NOTE: This cannot happen anymore with the new `ALL_RECORDINGS` thingy, but better safe than
// sorry.
py.allow_threads(|| {
let rec = RecordingStream::set_global(
rerun::StoreKind::Blueprint,
Expand Down Expand Up @@ -486,6 +520,9 @@ fn set_thread_local_blueprint_recording(
// to zero, which means dropping it, which means flushing it, which potentially means
// deallocating python-owned blueprint, which means grabbing the GIL, thus we need to release the
// GIL first.
//
// NOTE: This cannot happen anymore with the new `ALL_RECORDINGS` thingy, but better safe than
// sorry.
py.allow_threads(|| {
let rec = RecordingStream::set_thread_local(
rerun::StoreKind::Blueprint,
Expand Down

0 comments on commit eaf8383

Please sign in to comment.