Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce new mechanism to incrementally drain from a memory_recording #6187

Merged
merged 10 commits into from
May 2, 2024
21 changes: 21 additions & 0 deletions crates/re_sdk/src/log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,27 @@ impl MemorySinkStorage {
Ok(buffer.into_inner())
}

/// Drain the stored messages and return them as an in-memory RRD.
#[inline]
pub fn drain_as_bytes(&self) -> Result<Vec<u8>, re_log_encoding::encoder::EncodeError> {
let mut buffer = std::io::Cursor::new(Vec::new());

{
let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED;
let mut encoder =
re_log_encoding::encoder::Encoder::new(encoding_options, &mut buffer)?;

let mut inner = self.inner.lock();
inner.has_been_used = true;

for message in &std::mem::take(&mut inner.msgs) {
encoder.append(message)?;
}
jleibs marked this conversation as resolved.
Show resolved Hide resolved
}

Ok(buffer.into_inner())
}

#[inline]
/// Get the [`StoreId`] from the associated `RecordingStream` if it exists.
pub fn store_id(&self) -> Option<StoreId> {
Expand Down
129 changes: 2 additions & 127 deletions rerun_py/rerun_sdk/rerun/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@
get_recording_id,
get_thread_local_data_recording,
is_enabled,
new_recording,
set_global_data_recording,
set_thread_local_data_recording,
thread_local_stream,
)
from .script_helpers import script_add_args, script_setup, script_teardown
from .sinks import connect, disconnect, save, send_blueprint, serve, spawn, stdout
Expand Down Expand Up @@ -256,133 +258,6 @@ def init(
_spawn(default_blueprint=default_blueprint)


# TODO(#3793): defaulting recording_id to authkey should be opt-in
def new_recording(
application_id: str,
*,
recording_id: str | UUID | None = None,
make_default: bool = False,
make_thread_default: bool = False,
spawn: bool = False,
default_enabled: bool = True,
) -> RecordingStream:
"""
Creates a new recording with a user-chosen application id (name) that can be used to log data.

If you only need a single global recording, [`rerun.init`][] might be simpler.

!!! Warning
If you don't specify a `recording_id`, it will default to a random value that is generated once
at the start of the process.
That value will be kept around for the whole lifetime of the process, and even inherited by all
its subprocesses, if any.

This makes it trivial to log data to the same recording in a multiprocess setup, but it also means
that the following code will _not_ create two distinct recordings:
```
rr.init("my_app")
rr.init("my_app")
```

To create distinct recordings from the same process, specify distinct recording IDs:
```
from uuid import uuid4
rec = rr.new_recording(application_id="test", recording_id=uuid4())
rec = rr.new_recording(application_id="test", recording_id=uuid4())
```

Parameters
----------
application_id : str
Your Rerun recordings will be categorized by this application id, so
try to pick a unique one for each application that uses the Rerun SDK.

For example, if you have one application doing object detection
and another doing camera calibration, you could have
`rerun.init("object_detector")` and `rerun.init("calibrator")`.
recording_id : Optional[str]
Set the recording ID that this process is logging to, as a UUIDv4.

The default recording_id is based on `multiprocessing.current_process().authkey`
which means that all processes spawned with `multiprocessing`
will have the same default recording_id.

If you are not using `multiprocessing` and still want several different Python
processes to log to the same Rerun instance (and be part of the same recording),
you will need to manually assign them all the same recording_id.
Any random UUIDv4 will work, or copy the recording id for the parent process.
make_default : bool
If true (_not_ the default), the newly initialized recording will replace the current
active one (if any) in the global scope.
make_thread_default : bool
If true (_not_ the default), the newly initialized recording will replace the current
active one (if any) in the thread-local scope.
spawn : bool
Spawn a Rerun Viewer and stream logging data to it.
Short for calling `spawn` separately.
If you don't call this, log events will be buffered indefinitely until
you call either `connect`, `show`, or `save`
default_enabled
Should Rerun logging be on by default?
Can be overridden with the RERUN env-var, e.g. `RERUN=on` or `RERUN=off`.

Returns
-------
RecordingStream
A handle to the [`rerun.RecordingStream`][]. Use it to log data to Rerun.

"""

application_path = None

# NOTE: It'd be even nicer to do such thing on the Rust-side so that this little trick would
# only need to be written once and just work for all languages out of the box… unfortunately
# we lose most of the details of the python part of the backtrace once we go over the bridge.
#
# Still, better than nothing!
try:
import inspect
import pathlib

# We're trying to grab the filesystem path of the example script that called `init()`.
# The tricky part is that we don't know how many layers are between this script and the
# original caller, so we have to walk the stack and look for anything that might look like
# an official Rerun example.

MAX_FRAMES = 10 # try the first 10 frames, should be more than enough
FRAME_FILENAME_INDEX = 1 # `FrameInfo` tuple has `filename` at index 1

stack = inspect.stack()
for frame in stack[:MAX_FRAMES]:
filename = frame[FRAME_FILENAME_INDEX]
path = pathlib.Path(str(filename)).resolve() # normalize before comparison!
if "rerun/examples" in str(path):
application_path = path
except Exception:
pass

if recording_id is not None:
recording_id = str(recording_id)

recording = RecordingStream(
bindings.new_recording(
application_id=application_id,
recording_id=recording_id,
make_default=make_default,
make_thread_default=make_thread_default,
application_path=application_path,
default_enabled=default_enabled,
)
)

if spawn:
from rerun.sinks import spawn as _spawn

_spawn(recording=recording)

return recording


def version() -> str:
"""
Returns a verbose version string of the Rerun SDK.
Expand Down
8 changes: 8 additions & 0 deletions rerun_py/rerun_sdk/rerun/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ def num_msgs(self) -> int:
"""
return self.storage.num_msgs() # type: ignore[no-any-return]

def drain_as_bytes(self) -> bytes:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't love this name but was struggling to come up with a better one.

"""
Drains the MemoryRecording and returns the data as bytes.

This will flush the current sink before returning.
jleibs marked this conversation as resolved.
Show resolved Hide resolved
"""
return self.storage.drain_as_bytes() # type: ignore[no-any-return]

@deprecated("Please use rerun.notebook_show() instead.")
def as_html(
self,
Expand Down
179 changes: 178 additions & 1 deletion rerun_py/rerun_sdk/rerun/recording_stream.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,139 @@
from __future__ import annotations

import functools
import inspect
import uuid
from typing import Any, Callable, TypeVar

from rerun import bindings


# ---
# TODO(#3793): defaulting recording_id to authkey should be opt-in
def new_recording(
application_id: str,
*,
recording_id: str | uuid.UUID | None = None,
make_default: bool = False,
make_thread_default: bool = False,
spawn: bool = False,
default_enabled: bool = True,
) -> RecordingStream:
"""
Creates a new recording with a user-chosen application id (name) that can be used to log data.

If you only need a single global recording, [`rerun.init`][] might be simpler.

!!! Warning
If you don't specify a `recording_id`, it will default to a random value that is generated once
at the start of the process.
That value will be kept around for the whole lifetime of the process, and even inherited by all
its subprocesses, if any.

This makes it trivial to log data to the same recording in a multiprocess setup, but it also means
that the following code will _not_ create two distinct recordings:
```
rr.init("my_app")
rr.init("my_app")
```

To create distinct recordings from the same process, specify distinct recording IDs:
```
from uuid import uuid4
rec = rr.new_recording(application_id="test", recording_id=uuid4())
rec = rr.new_recording(application_id="test", recording_id=uuid4())
```

Parameters
----------
application_id : str
Your Rerun recordings will be categorized by this application id, so
try to pick a unique one for each application that uses the Rerun SDK.

For example, if you have one application doing object detection
and another doing camera calibration, you could have
`rerun.init("object_detector")` and `rerun.init("calibrator")`.
recording_id : Optional[str]
Set the recording ID that this process is logging to, as a UUIDv4.

The default recording_id is based on `multiprocessing.current_process().authkey`
which means that all processes spawned with `multiprocessing`
will have the same default recording_id.

If you are not using `multiprocessing` and still want several different Python
processes to log to the same Rerun instance (and be part of the same recording),
you will need to manually assign them all the same recording_id.
Any random UUIDv4 will work, or copy the recording id for the parent process.
make_default : bool
If true (_not_ the default), the newly initialized recording will replace the current
active one (if any) in the global scope.
make_thread_default : bool
If true (_not_ the default), the newly initialized recording will replace the current
active one (if any) in the thread-local scope.
spawn : bool
Spawn a Rerun Viewer and stream logging data to it.
Short for calling `spawn` separately.
If you don't call this, log events will be buffered indefinitely until
you call either `connect`, `show`, or `save`
default_enabled
Should Rerun logging be on by default?
Can be overridden with the RERUN env-var, e.g. `RERUN=on` or `RERUN=off`.

Returns
-------
RecordingStream
A handle to the [`rerun.RecordingStream`][]. Use it to log data to Rerun.

"""

application_path = None

# NOTE: It'd be even nicer to do such thing on the Rust-side so that this little trick would
# only need to be written once and just work for all languages out of the box… unfortunately
# we lose most of the details of the python part of the backtrace once we go over the bridge.
#
# Still, better than nothing!
try:
import inspect
import pathlib

# We're trying to grab the filesystem path of the example script that called `init()`.
# The tricky part is that we don't know how many layers are between this script and the
# original caller, so we have to walk the stack and look for anything that might look like
# an official Rerun example.

MAX_FRAMES = 10 # try the first 10 frames, should be more than enough
FRAME_FILENAME_INDEX = 1 # `FrameInfo` tuple has `filename` at index 1

stack = inspect.stack()
for frame in stack[:MAX_FRAMES]:
filename = frame[FRAME_FILENAME_INDEX]
path = pathlib.Path(str(filename)).resolve() # normalize before comparison!
if "rerun/examples" in str(path):
application_path = path
except Exception:
pass

if recording_id is not None:
recording_id = str(recording_id)

recording = RecordingStream(
bindings.new_recording(
application_id=application_id,
recording_id=recording_id,
make_default=make_default,
make_thread_default=make_thread_default,
application_path=application_path,
default_enabled=default_enabled,
)
)

if spawn:
from rerun.sinks import spawn as _spawn

_spawn(recording=recording)

return recording


class RecordingStream:
Expand Down Expand Up @@ -89,7 +220,6 @@ def _patch(funcs): # type: ignore[no-untyped-def]
"""Adds the given functions as methods to the `RecordingStream` class; injects `recording=self` in passing."""
import functools
import os
from typing import Any

# If this is a special RERUN_APP_ONLY context (launched via .spawn), we
# can bypass everything else, which keeps us from monkey patching methods
Expand Down Expand Up @@ -272,3 +402,50 @@ def set_thread_local_data_recording(recording: RecordingStream) -> RecordingStre
"""
result = bindings.set_thread_local_data_recording(recording=RecordingStream.to_native(recording))
return RecordingStream(result) if result is not None else None


_TFunc = TypeVar("_TFunc", bound=Callable[..., Any])


def thread_local_stream(application_id: str) -> Callable[[_TFunc], _TFunc]:
"""
Create a thread-local recording stream and use it when executing the decorated function.

This can be helpful for decorating a function that represents a job or a task that you want to
to produce its own isolated recording.

jleibs marked this conversation as resolved.
Show resolved Hide resolved
Parameters
----------
application_id : str
The application ID that this recording is associated with.

"""

def decorator(func: _TFunc) -> _TFunc:
if inspect.isgeneratorfunction(func): # noqa: F821

@functools.wraps(func)
def generator_wrapper(*args: Any, **kwargs: Any) -> Any:
gen = func(*args, **kwargs)
try:
with new_recording(application_id, recording_id=uuid.uuid4()):
value = next(gen) # Start the generator inside the context
while True:
value = gen.send((yield value)) # Continue the generator
except StopIteration:
pass
finally:
gen.close()

return generator_wrapper # type: ignore[return-value]
else:

@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
with new_recording(application_id, recording_id=uuid.uuid4()):
gen = func(*args, **kwargs)
return gen

return wrapper # type: ignore[return-value]

return decorator
Loading
Loading