Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Work around some issues where recording streams leaking context when used with generators #6240

Merged
merged 4 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rerun_py/docs/gen_common_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,8 @@ class Section:
"start_web_viewer_server",
"escape_entity_path_part",
"new_entity_path",
"thread_local_stream",
"recording_stream_generator_ctx",
],
class_list=["RecordingStream", "LoggingHandler", "MemoryRecording"],
),
Expand Down
1 change: 1 addition & 0 deletions rerun_py/rerun_sdk/rerun/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
get_thread_local_data_recording,
is_enabled,
new_recording,
recording_stream_generator_ctx,
set_global_data_recording,
set_thread_local_data_recording,
thread_local_stream,
Expand Down
125 changes: 121 additions & 4 deletions rerun_py/rerun_sdk/rerun/recording_stream.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import contextvars
import functools
import inspect
import uuid
Expand Down Expand Up @@ -136,6 +137,14 @@ def new_recording(
return recording


active_recording_stream: contextvars.ContextVar[RecordingStream] = contextvars.ContextVar("active_recording_stream")
"""
A context variable that tracks the currently active recording stream.
Used to managed and detect interactions between generators and RecordingStream context-manager objects.
"""


class RecordingStream:
"""
A RecordingStream is used to send data to Rerun.
Expand All @@ -156,6 +165,10 @@ class RecordingStream:
with rec:
rr.log(...)
```
WARNING: if using a RecordingStream as a context manager, yielding from a generator function
while holding the context open will leak the context and likely cause your program to send data
to the wrong stream. See: https://github.com/rerun-io/rerun/issues/6238. You can work around this
by using the [`rerun.recording_stream_generator_ctx`][] decorator.
See also: [`rerun.get_data_recording`][], [`rerun.get_global_data_recording`][],
[`rerun.get_thread_local_data_recording`][].
Expand Down Expand Up @@ -199,13 +212,30 @@ class RecordingStream:
def __init__(self, inner: bindings.PyRecordingStream) -> None:
self.inner = inner
self._prev: RecordingStream | None = None
self.context_token: contextvars.Token[RecordingStream] | None = None

def __enter__(self): # type: ignore[no-untyped-def]
self.context_token = active_recording_stream.set(self)
self._prev = set_thread_local_data_recording(self)
return self

def __exit__(self, type, value, traceback): # type: ignore[no-untyped-def]
self._prev = set_thread_local_data_recording(self._prev) # type: ignore[arg-type]
current_recording = active_recording_stream.get(None)

# Restore the context state
if self.context_token is not None:
active_recording_stream.reset(self.context_token)

# Restore the recording stream state
set_thread_local_data_recording(self._prev) # type: ignore[arg-type]
self._prev = None

# Sanity check: we set this context-var on enter. If it's not still set, something weird
# happened. The user is probably doing something sketch with generators or async code.
if current_recording is not self:
raise RuntimeError(
"RecordingStream context manager exited while not active. Likely mixing context managers with generators or async code. See: `recording_stream_generator_ctx`."
)

# NOTE: The type is a string because we cannot reference `RecordingStream` yet at this point.
def to_native(self: RecordingStream | None) -> bindings.PyRecordingStream | None:
Expand Down Expand Up @@ -441,12 +471,21 @@ def decorator(func: _TFunc) -> _TFunc:

@functools.wraps(func)
def generator_wrapper(*args: Any, **kwargs: Any) -> Any:
# The following code is structured to avoid leaking the recording stream
# context when yielding from the generator.
# See: https://github.com/rerun-io/rerun/issues/6238
#
# The basic idea is to only ever hold the context object open while
# the generator is actively running, but to release it prior to yielding.
gen = func(*args, **kwargs)
stream = new_recording(application_id, recording_id=uuid.uuid4())
try:
with new_recording(application_id, recording_id=uuid.uuid4()):
with stream:
value = next(gen) # Start the generator inside the context
while True:
value = gen.send((yield value)) # Continue the generator
while True:
cont = yield value # Yield the value, suspending the generator
with stream:
value = gen.send(cont) # Resume the generator inside the context
except StopIteration:
pass
finally:
Expand All @@ -464,3 +503,81 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
return wrapper # type: ignore[return-value]

return decorator


def recording_stream_generator_ctx(func: _TFunc) -> _TFunc:
"""
Decorator to manage recording stream context for generator functions.
This is only necessary if you need to implement a generator which yields while holding an open
recording stream context which it created. This decorator will ensure that the recording stream
context is suspended and then properly resumed upon re-entering the generator.
See: https://github.com/rerun-io/rerun/issues/6238 for context on why this is necessary.
There are plenty of things that can go wrong when mixing context managers with generators, so
don't use this decorator unless you're sure you need it.
If you can plumb through `RecordingStream` objects and use those directly instead of relying on
the context manager, that will always be more robust.
Example
-------
```python
@rr.recording_stream.recording_stream_generator_ctx
def my_generator(name: str) -> Iterator[None]:
with rr.new_recording(name):
rr.save(f"{name}.rrd")
for i in range(10):
rr.log("stream", rr.TextLog(f"{name} {i}"))
yield i
for i in my_generator("foo"):
pass
```
"""
if inspect.isgeneratorfunction(func): # noqa: F821

@functools.wraps(func)
def generator_wrapper(*args: Any, **kwargs: Any) -> Any:
# The following code is structured to avoid leaking the recording stream
# context when yielding from the generator.
# See: https://github.com/rerun-io/rerun/issues/6238
#
# The basic idea is to only ever hold the context object open while
# the generator is actively running, but to release it prior to yielding.
gen = func(*args, **kwargs)
current_recording = None
try:
value = next(gen) # Get the first generated value
while True:
current_recording = active_recording_stream.get(None)

if current_recording is not None:
# TODO(jleibs): Do we need to pass something through here?
# Probably not, since __exit__ doesn't use those args, but
# keep an eye on this.
current_recording.__exit__(None, None, None) # Exit our context before we yield

cont = yield value # Yield the value, suspending the generator

if current_recording is not None:
current_recording.__enter__() # Restore our context before we continue

value = gen.send(cont) # Resume the generator inside the context

except StopIteration:
# StopIteration is raised from inside `gen.send()`. This happens after a call
# `__enter__` and means we don't need to enter during finally, below.
current_recording = None
finally:
# If we never reached the end of the iterator (StopIteration wasn't raised), then
# we need to enter again before finally closing the generator.
if current_recording is not None:
current_recording.__enter__()
gen.close()

return generator_wrapper # type: ignore[return-value]
else:
raise ValueError("Only generator functions can be decorated with `recording_stream_generator_ctx`")
Loading