Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce new mechanism to incrementally drain from a memory_recording #6187

Merged
merged 10 commits into from
May 2, 2024
76 changes: 64 additions & 12 deletions crates/re_sdk/src/log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::sync::Arc;
use parking_lot::Mutex;
use re_log_types::{BlueprintActivationCommand, LogMsg, StoreId};

use crate::RecordingStream;

/// Where the SDK sends its log messages.
pub trait LogSink: Send + Sync + 'static {
/// Send this log message.
Expand Down Expand Up @@ -126,10 +128,15 @@ impl fmt::Debug for BufferedSink {
///
/// Additionally the raw storage can be accessed and used to create an in-memory RRD.
/// This is useful for things like the inline rrd-viewer in Jupyter notebooks.
#[derive(Default)]
pub struct MemorySink(MemorySinkStorage);

impl MemorySink {
/// Create a new [`MemorySink`] with an associated [`RecordingStream`].
#[inline]
pub fn new(rec: RecordingStream) -> Self {
Self(MemorySinkStorage::new(rec))
}

/// Access the raw `MemorySinkStorage`
#[inline]
pub fn buffer(&self) -> MemorySinkStorage {
Expand All @@ -153,7 +160,11 @@ impl LogSink for MemorySink {

#[inline]
fn drain_backlog(&self) -> Vec<LogMsg> {
self.0.take()
// Note that When draining the backlog, we don't call `take` since that would flush
// the stream. But drain_backlog is being called as part of `set_sink`, which has already queued
// a flush of the batcher. Queueing a second flush here seems to lead to a deadlock
// at shutdown.
std::mem::take(&mut (self.0.write()))
}
}

Expand All @@ -170,10 +181,10 @@ struct MemorySinkStorageInner {
}

/// The storage used by [`MemorySink`].
#[derive(Default, Clone)]
#[derive(Clone)]
pub struct MemorySinkStorage {
inner: Arc<Mutex<MemorySinkStorageInner>>,
pub(crate) rec: Option<crate::RecordingStream>,
pub(crate) rec: RecordingStream,
}

impl Drop for MemorySinkStorage {
Expand All @@ -194,6 +205,14 @@ impl Drop for MemorySinkStorage {
}

impl MemorySinkStorage {
/// Create a new [`MemorySinkStorage`] with an associated [`RecordingStream`].
fn new(rec: RecordingStream) -> Self {
Self {
inner: Default::default(),
rec,
}
}

/// Write access to the inner array of [`LogMsg`].
#[inline]
fn write(&self) -> parking_lot::MappedMutexGuard<'_, Vec<LogMsg>> {
Expand All @@ -203,8 +222,14 @@ impl MemorySinkStorage {
}

/// How many messages are currently written to this memory sink
///
/// This automatically takes care of flushing the underlying [`crate::RecordingStream`].
#[inline]
pub fn num_msgs(&self) -> usize {
// NOTE: It's fine, this is an in-memory sink so by definition there's no I/O involved
// in this flush; it's just a matter of making the table batcher tick early.
self.rec.flush_blocking();

self.inner.lock().msgs.len()
}

Expand All @@ -213,15 +238,15 @@ impl MemorySinkStorage {
/// This automatically takes care of flushing the underlying [`crate::RecordingStream`].
#[inline]
pub fn take(&self) -> Vec<LogMsg> {
if let Some(rec) = self.rec.as_ref() {
// NOTE: It's fine, this is an in-memory sink so by definition there's no I/O involved
// in this flush; it's just a matter of making the table batcher tick early.
rec.flush_blocking();
}
// NOTE: It's fine, this is an in-memory sink so by definition there's no I/O involved
// in this flush; it's just a matter of making the table batcher tick early.
self.rec.flush_blocking();
std::mem::take(&mut (self.write()))
}

/// Convert the stored messages into an in-memory Rerun log file.
///
/// This automatically takes care of flushing the underlying [`crate::RecordingStream`].
#[inline]
pub fn concat_memory_sinks_as_bytes(
sinks: &[&Self],
Expand All @@ -233,6 +258,9 @@ impl MemorySinkStorage {
let mut encoder =
re_log_encoding::encoder::Encoder::new(encoding_options, &mut buffer)?;
for sink in sinks {
// NOTE: It's fine, this is an in-memory sink so by definition there's no I/O involved
// in this flush; it's just a matter of making the table batcher tick early.
sink.rec.flush_blocking();
let mut inner = sink.inner.lock();
inner.has_been_used = true;

Expand All @@ -245,12 +273,36 @@ impl MemorySinkStorage {
Ok(buffer.into_inner())
}

/// Drain the stored messages and return them as an in-memory RRD.
///
/// This automatically takes care of flushing the underlying [`crate::RecordingStream`].
#[inline]
pub fn drain_as_bytes(&self) -> Result<Vec<u8>, re_log_encoding::encoder::EncodeError> {
// NOTE: It's fine, this is an in-memory sink so by definition there's no I/O involved
// in this flush; it's just a matter of making the table batcher tick early.
self.rec.flush_blocking();
let mut buffer = std::io::Cursor::new(Vec::new());

{
let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED;
let mut encoder =
re_log_encoding::encoder::Encoder::new(encoding_options, &mut buffer)?;

let mut inner = self.inner.lock();
inner.has_been_used = true;

for message in &std::mem::take(&mut inner.msgs) {
encoder.append(message)?;
}
jleibs marked this conversation as resolved.
Show resolved Hide resolved
}

Ok(buffer.into_inner())
}

#[inline]
/// Get the [`StoreId`] from the associated `RecordingStream` if it exists.
pub fn store_id(&self) -> Option<StoreId> {
self.rec
.as_ref()
.and_then(|rec| rec.store_info().map(|info| info.store_id.clone()))
self.rec.store_info().map(|info| info.store_id.clone())
}
}
// ----------------------------------------------------------------------------
Expand Down
30 changes: 15 additions & 15 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,19 +275,22 @@ impl RecordingStreamBuilder {
pub fn memory(
self,
) -> RecordingStreamResult<(RecordingStream, crate::log_sink::MemorySinkStorage)> {
let sink = crate::log_sink::MemorySink::default();
let mut storage = sink.buffer();

let (enabled, store_info, batcher_config) = self.into_args();
if enabled {
RecordingStream::new(store_info, batcher_config, Box::new(sink)).map(|rec| {
storage.rec = Some(rec.clone());
(rec, storage)
})
let rec = if enabled {
RecordingStream::new(
store_info,
batcher_config,
Box::new(crate::log_sink::BufferedSink::new()),
)
} else {
re_log::debug!("Rerun disabled - call to memory() ignored");
Ok((RecordingStream::disabled(), Default::default()))
}
Ok(RecordingStream::disabled())
}?;

let sink = crate::log_sink::MemorySink::new(rec.clone());
let storage = sink.buffer();
rec.set_sink(Box::new(sink));
Ok((rec, storage))
}

/// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
Expand Down Expand Up @@ -1581,12 +1584,9 @@ impl RecordingStream {
/// terms of data durability and ordering.
/// See [`Self::set_sink`] for more information.
pub fn memory(&self) -> MemorySinkStorage {
let sink = crate::sink::MemorySink::default();
let mut storage = sink.buffer();

let sink = crate::sink::MemorySink::new(self.clone());
let storage = sink.buffer();
self.set_sink(Box::new(sink));
storage.rec = Some(self.clone());

storage
}

Expand Down
129 changes: 2 additions & 127 deletions rerun_py/rerun_sdk/rerun/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,10 @@
get_recording_id,
get_thread_local_data_recording,
is_enabled,
new_recording,
set_global_data_recording,
set_thread_local_data_recording,
thread_local_stream,
)
from .script_helpers import script_add_args, script_setup, script_teardown
from .sinks import connect, disconnect, save, send_blueprint, serve, spawn, stdout
Expand Down Expand Up @@ -255,133 +257,6 @@ def init(
_spawn(default_blueprint=default_blueprint)


# TODO(#3793): defaulting recording_id to authkey should be opt-in
def new_recording(
application_id: str,
*,
recording_id: str | UUID | None = None,
make_default: bool = False,
make_thread_default: bool = False,
spawn: bool = False,
default_enabled: bool = True,
) -> RecordingStream:
"""
Creates a new recording with a user-chosen application id (name) that can be used to log data.
If you only need a single global recording, [`rerun.init`][] might be simpler.
!!! Warning
If you don't specify a `recording_id`, it will default to a random value that is generated once
at the start of the process.
That value will be kept around for the whole lifetime of the process, and even inherited by all
its subprocesses, if any.
This makes it trivial to log data to the same recording in a multiprocess setup, but it also means
that the following code will _not_ create two distinct recordings:
```
rr.init("my_app")
rr.init("my_app")
```
To create distinct recordings from the same process, specify distinct recording IDs:
```
from uuid import uuid4
rec = rr.new_recording(application_id="test", recording_id=uuid4())
rec = rr.new_recording(application_id="test", recording_id=uuid4())
```
Parameters
----------
application_id : str
Your Rerun recordings will be categorized by this application id, so
try to pick a unique one for each application that uses the Rerun SDK.
For example, if you have one application doing object detection
and another doing camera calibration, you could have
`rerun.init("object_detector")` and `rerun.init("calibrator")`.
recording_id : Optional[str]
Set the recording ID that this process is logging to, as a UUIDv4.
The default recording_id is based on `multiprocessing.current_process().authkey`
which means that all processes spawned with `multiprocessing`
will have the same default recording_id.
If you are not using `multiprocessing` and still want several different Python
processes to log to the same Rerun instance (and be part of the same recording),
you will need to manually assign them all the same recording_id.
Any random UUIDv4 will work, or copy the recording id for the parent process.
make_default : bool
If true (_not_ the default), the newly initialized recording will replace the current
active one (if any) in the global scope.
make_thread_default : bool
If true (_not_ the default), the newly initialized recording will replace the current
active one (if any) in the thread-local scope.
spawn : bool
Spawn a Rerun Viewer and stream logging data to it.
Short for calling `spawn` separately.
If you don't call this, log events will be buffered indefinitely until
you call either `connect`, `show`, or `save`
default_enabled
Should Rerun logging be on by default?
Can be overridden with the RERUN env-var, e.g. `RERUN=on` or `RERUN=off`.
Returns
-------
RecordingStream
A handle to the [`rerun.RecordingStream`][]. Use it to log data to Rerun.
"""

application_path = None

# NOTE: It'd be even nicer to do such thing on the Rust-side so that this little trick would
# only need to be written once and just work for all languages out of the box… unfortunately
# we lose most of the details of the python part of the backtrace once we go over the bridge.
#
# Still, better than nothing!
try:
import inspect
import pathlib

# We're trying to grab the filesystem path of the example script that called `init()`.
# The tricky part is that we don't know how many layers are between this script and the
# original caller, so we have to walk the stack and look for anything that might look like
# an official Rerun example.

MAX_FRAMES = 10 # try the first 10 frames, should be more than enough
FRAME_FILENAME_INDEX = 1 # `FrameInfo` tuple has `filename` at index 1

stack = inspect.stack()
for frame in stack[:MAX_FRAMES]:
filename = frame[FRAME_FILENAME_INDEX]
path = pathlib.Path(str(filename)).resolve() # normalize before comparison!
if "rerun/examples" in str(path):
application_path = path
except Exception:
pass

if recording_id is not None:
recording_id = str(recording_id)

recording = RecordingStream(
bindings.new_recording(
application_id=application_id,
recording_id=recording_id,
make_default=make_default,
make_thread_default=make_thread_default,
application_path=application_path,
default_enabled=default_enabled,
)
)

if spawn:
from rerun.sinks import spawn as _spawn

_spawn(recording=recording)

return recording


def version() -> str:
"""
Returns a verbose version string of the Rerun SDK.
Expand Down
8 changes: 8 additions & 0 deletions rerun_py/rerun_sdk/rerun/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ def num_msgs(self) -> int:
"""
return self.storage.num_msgs() # type: ignore[no-any-return]

def drain_as_bytes(self) -> bytes:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't love this name but was struggling to come up with a better one.

"""
Drains the MemoryRecording and returns the data as bytes.
This will flush the current sink before returning.
jleibs marked this conversation as resolved.
Show resolved Hide resolved
"""
return self.storage.drain_as_bytes() # type: ignore[no-any-return]

@deprecated("Please use rerun.notebook_show() instead.")
def as_html(
self,
Expand Down
Loading
Loading