Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix possible hang when using torch.multiprocessing #6271

Merged
merged 7 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,11 @@ impl RecordingStream {
/// If the current sink is in a broken state (e.g. a TCP sink with a broken connection that
/// cannot be repaired), all pending data in its buffers will be dropped.
pub fn set_sink(&self, sink: Box<dyn LogSink>) {
if self.is_forked_child() {
re_log::error_once!("Fork detected during set_sink. cleanup_if_forked() should always be called after forking. This is likely a bug in the SDK.");
return;
}

let f = move |inner: &RecordingStreamInner| {
// NOTE: Internal channels can never be closed outside of the `Drop` impl, all these sends
// are safe.
Expand Down Expand Up @@ -1435,6 +1440,11 @@ impl RecordingStream {
/// This does **not** wait for the flush to propagate (see [`Self::flush_blocking`]).
/// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees.
pub fn flush_async(&self) {
if self.is_forked_child() {
re_log::error_once!("Fork detected during flush_async. cleanup_if_forked() should always be called after forking. This is likely a bug in the SDK.");
return;
}

let f = move |inner: &RecordingStreamInner| {
// NOTE: Internal channels can never be closed outside of the `Drop` impl, all these sends
// are safe.
Expand Down
8 changes: 7 additions & 1 deletion rerun_py/rerun_sdk/rerun/recording_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,13 @@ def to_native(self: RecordingStream | None) -> bindings.PyRecordingStream | None

def __del__(self): # type: ignore[no-untyped-def]
recording = RecordingStream.to_native(self)
bindings.flush(blocking=False, recording=recording)
# TODO(jleibs): I'm 98% sure this flush is redundant, but removing it requires more thorough testing.
# However, it's definitely a problem if we are in a forked child process. The rerun SDK will still
# detect this case and prevent a hang internally, but will do so with a warning that we should avoid.
#
# See: https://github.com/rerun-io/rerun/issues/6223 for context on why this is necessary.
if recording is not None and not recording.is_forked_child():
bindings.flush(blocking=False, recording=recording)


def binary_stream(recording: RecordingStream | None = None) -> BinaryStream:
Expand Down
14 changes: 14 additions & 0 deletions rerun_py/src/python_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,20 @@ fn shutdown(py: Python<'_>) {
#[derive(Clone)]
struct PyRecordingStream(RecordingStream);

#[pymethods]
impl PyRecordingStream {
/// Determine if this stream is operating in the context of a forked child process.
///
/// This means the stream was created in the parent process. It now exists in the child
/// process by way of fork, but it is effectively a zombie since it's batcher and sink
jleibs marked this conversation as resolved.
Show resolved Hide resolved
/// threads would not have been copied.
///
/// Calling operations such as flush or set_sink will result in an error.
fn is_forked_child(&self) -> bool {
self.0.is_forked_child()
}
}

impl std::ops::Deref for PyRecordingStream {
type Target = RecordingStream;

Expand Down
34 changes: 34 additions & 0 deletions rerun_py/tests/unit/test_multiprocessing_gc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import annotations

import gc

import rerun as rr

# If torch is available, use torch.multiprocessing instead of multiprocessing
# since it causes more issues. But, it's annoying to always require it so at
# least for the tests in other contexts, we'll use the standard library version.
try:
import torch.multiprocessing as multiprocessing
except ImportError:
import multiprocessing # ignore[no-redef]


def task() -> None:
# Forcing a gc in the multiprocess task can cause issues, most notably
# hangs, if recording streams were leaked across the fork. We see this
# happen specifically using the `torch.multiprocessing` module.
gc.collect()


def test_multiprocessing_gc() -> None:
rr.init("rerun_example_multiprocessing_gc")

proc = multiprocessing.Process(
target=task,
)
proc.start()
proc.join(1)
if proc.is_alive():
# Terminate so our test doesn't get stuck
proc.terminate()
assert False, "Process deadlocked during gc.collect()"
Loading