Skip to content

Commit

Permalink
Work around some issues where recording streams leaking context when …
Browse files Browse the repository at this point in the history
…used with generators (#6240)

### What
 - Resolves: #6238

In `thread_local_stream`, don't yield while holding the stream context
open. Re-create context before continuing the generator.

Also introduce a new `recording_stream_generator_ctx` for more advanced
usage. This is mainly an escape hatch.

The problem from the example can now be handled using:
```
import rerun as rr

@rr.recording_stream_generator_ctx
def my_gen_func(stream, name):
    with stream:
        for i in range(10):
            print(f"{name} {i}")
            rr.log("stream", rr.TextLog(f"{name} {i}"))
            yield

rr.init("rerun_example_leak_context")

stream1 = rr.new_recording("rerun_example_stream1")
stream1.save("stream1.rrd")
stream2 = rr.new_recording("rerun_example_stream2")
stream2.save("stream2.rrd")

gen1 = my_gen_func(stream1, "stream1")
gen2 = my_gen_func(stream2, "stream2")

next(gen1)
next(gen2)
rr.log("stream", rr.TextLog("This should go to the global stream"))
next(gen1)
next(gen1)
next(gen1)
```

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested the web demo (if applicable):
* Using examples from latest `main` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/6240?manifest_url=https://app.rerun.io/version/main/examples_manifest.json)
* Using full set of examples from `nightly` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/6240?manifest_url=https://app.rerun.io/version/nightly/examples_manifest.json)
* [x] The PR title and labels are set such as to maximize their
usefulness for the next release's CHANGELOG
* [x] If applicable, add a new check to the [release
checklist](https://github.com/rerun-io/rerun/blob/main/tests/python/release_checklist)!

- [PR Build Summary](https://build.rerun.io/pr/6240)
- [Recent benchmark results](https://build.rerun.io/graphs/crates.html)
- [Wasm size tracking](https://build.rerun.io/graphs/sizes.html)

To run all checks from `main`, comment on the PR with `@rerun-bot
full-check`.
  • Loading branch information
jleibs authored May 7, 2024
1 parent f0e1d32 commit 9032b01
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 4 deletions.
2 changes: 2 additions & 0 deletions rerun_py/docs/gen_common_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,8 @@ class Section:
"start_web_viewer_server",
"escape_entity_path_part",
"new_entity_path",
"thread_local_stream",
"recording_stream_generator_ctx",
],
class_list=["RecordingStream", "LoggingHandler", "MemoryRecording"],
),
Expand Down
1 change: 1 addition & 0 deletions rerun_py/rerun_sdk/rerun/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
get_thread_local_data_recording,
is_enabled,
new_recording,
recording_stream_generator_ctx,
set_global_data_recording,
set_thread_local_data_recording,
thread_local_stream,
Expand Down
125 changes: 121 additions & 4 deletions rerun_py/rerun_sdk/rerun/recording_stream.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import contextvars
import functools
import inspect
import uuid
Expand Down Expand Up @@ -136,6 +137,14 @@ def new_recording(
return recording


active_recording_stream: contextvars.ContextVar[RecordingStream] = contextvars.ContextVar("active_recording_stream")
"""
A context variable that tracks the currently active recording stream.
Used to managed and detect interactions between generators and RecordingStream context-manager objects.
"""


class RecordingStream:
"""
A RecordingStream is used to send data to Rerun.
Expand All @@ -156,6 +165,10 @@ class RecordingStream:
with rec:
rr.log(...)
```
WARNING: if using a RecordingStream as a context manager, yielding from a generator function
while holding the context open will leak the context and likely cause your program to send data
to the wrong stream. See: https://github.com/rerun-io/rerun/issues/6238. You can work around this
by using the [`rerun.recording_stream_generator_ctx`][] decorator.
See also: [`rerun.get_data_recording`][], [`rerun.get_global_data_recording`][],
[`rerun.get_thread_local_data_recording`][].
Expand Down Expand Up @@ -199,13 +212,30 @@ class RecordingStream:
def __init__(self, inner: bindings.PyRecordingStream) -> None:
self.inner = inner
self._prev: RecordingStream | None = None
self.context_token: contextvars.Token[RecordingStream] | None = None

def __enter__(self): # type: ignore[no-untyped-def]
self.context_token = active_recording_stream.set(self)
self._prev = set_thread_local_data_recording(self)
return self

def __exit__(self, type, value, traceback): # type: ignore[no-untyped-def]
self._prev = set_thread_local_data_recording(self._prev) # type: ignore[arg-type]
current_recording = active_recording_stream.get(None)

# Restore the context state
if self.context_token is not None:
active_recording_stream.reset(self.context_token)

# Restore the recording stream state
set_thread_local_data_recording(self._prev) # type: ignore[arg-type]
self._prev = None

# Sanity check: we set this context-var on enter. If it's not still set, something weird
# happened. The user is probably doing something sketch with generators or async code.
if current_recording is not self:
raise RuntimeError(
"RecordingStream context manager exited while not active. Likely mixing context managers with generators or async code. See: `recording_stream_generator_ctx`."
)

# NOTE: The type is a string because we cannot reference `RecordingStream` yet at this point.
def to_native(self: RecordingStream | None) -> bindings.PyRecordingStream | None:
Expand Down Expand Up @@ -441,12 +471,21 @@ def decorator(func: _TFunc) -> _TFunc:

@functools.wraps(func)
def generator_wrapper(*args: Any, **kwargs: Any) -> Any:
# The following code is structured to avoid leaking the recording stream
# context when yielding from the generator.
# See: https://github.com/rerun-io/rerun/issues/6238
#
# The basic idea is to only ever hold the context object open while
# the generator is actively running, but to release it prior to yielding.
gen = func(*args, **kwargs)
stream = new_recording(application_id, recording_id=uuid.uuid4())
try:
with new_recording(application_id, recording_id=uuid.uuid4()):
with stream:
value = next(gen) # Start the generator inside the context
while True:
value = gen.send((yield value)) # Continue the generator
while True:
cont = yield value # Yield the value, suspending the generator
with stream:
value = gen.send(cont) # Resume the generator inside the context
except StopIteration:
pass
finally:
Expand All @@ -464,3 +503,81 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
return wrapper # type: ignore[return-value]

return decorator


def recording_stream_generator_ctx(func: _TFunc) -> _TFunc:
"""
Decorator to manage recording stream context for generator functions.
This is only necessary if you need to implement a generator which yields while holding an open
recording stream context which it created. This decorator will ensure that the recording stream
context is suspended and then properly resumed upon re-entering the generator.
See: https://github.com/rerun-io/rerun/issues/6238 for context on why this is necessary.
There are plenty of things that can go wrong when mixing context managers with generators, so
don't use this decorator unless you're sure you need it.
If you can plumb through `RecordingStream` objects and use those directly instead of relying on
the context manager, that will always be more robust.
Example
-------
```python
@rr.recording_stream.recording_stream_generator_ctx
def my_generator(name: str) -> Iterator[None]:
with rr.new_recording(name):
rr.save(f"{name}.rrd")
for i in range(10):
rr.log("stream", rr.TextLog(f"{name} {i}"))
yield i
for i in my_generator("foo"):
pass
```
"""
if inspect.isgeneratorfunction(func): # noqa: F821

@functools.wraps(func)
def generator_wrapper(*args: Any, **kwargs: Any) -> Any:
# The following code is structured to avoid leaking the recording stream
# context when yielding from the generator.
# See: https://github.com/rerun-io/rerun/issues/6238
#
# The basic idea is to only ever hold the context object open while
# the generator is actively running, but to release it prior to yielding.
gen = func(*args, **kwargs)
current_recording = None
try:
value = next(gen) # Get the first generated value
while True:
current_recording = active_recording_stream.get(None)

if current_recording is not None:
# TODO(jleibs): Do we need to pass something through here?
# Probably not, since __exit__ doesn't use those args, but
# keep an eye on this.
current_recording.__exit__(None, None, None) # Exit our context before we yield

cont = yield value # Yield the value, suspending the generator

if current_recording is not None:
current_recording.__enter__() # Restore our context before we continue

value = gen.send(cont) # Resume the generator inside the context

except StopIteration:
# StopIteration is raised from inside `gen.send()`. This happens after a call
# `__enter__` and means we don't need to enter during finally, below.
current_recording = None
finally:
# If we never reached the end of the iterator (StopIteration wasn't raised), then
# we need to enter again before finally closing the generator.
if current_recording is not None:
current_recording.__enter__()
gen.close()

return generator_wrapper # type: ignore[return-value]
else:
raise ValueError("Only generator functions can be decorated with `recording_stream_generator_ctx`")

0 comments on commit 9032b01

Please sign in to comment.