Skip to content

Commit

Permalink
Introduce a new BinaryStreamSink that allows reading a stream of enco…
Browse files Browse the repository at this point in the history
…ded bytes (#6242)

### What
- Resolves: #6241

This effectively combines the guts of FileSink and MemorySink. Like the
FileSink, this runs file-encoding on its own thread so that all we need
to do when we're ready to read is move out the bytes.

The downside relative to memory_sink is the stream is a singular RRD.
You have no way of accessing information about individual messages,
draining from the backlog, etc.

This is designed to be as easy as possible to wire into a gradio output
stream.

Example usage in a Gradio Component:
```python
@rr.thread_local_stream("rerun_example_live")
def start_stream_direct(state):

    state['streaming'] = True

    cap = cv2.VideoCapture(0)
    frame_nr = 0
    stream = rr.binary_stream()

    while state.get('streaming', False):
        ret, img = cap.read()

        frame_time_ms = cap.get(cv2.CAP_PROP_POS_MSEC)
        if frame_time_ms != 0:
            rr.set_time_nanos("frame_time", int(frame_time_ms * 1_000_000))

        rr.set_time_sequence("frame_nr", frame_nr)
        frame_nr += 1

        print("Processing frame", frame_nr)

        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        # Log the original image
        rr.log("image/rgb", rr.Image(img).compress())

        # Convert to grayscale
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        rr.log("image/gray", rr.Image(gray).compress())

        # Run the canny edge detector
        canny = cv2.Canny(gray, 50, 200)
        rr.log("image/canny", rr.Image(canny).compress())

        yield stream.read()
```

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested the web demo (if applicable):
* Using examples from latest `main` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/6242?manifest_url=https://app.rerun.io/version/main/examples_manifest.json)
* Using full set of examples from `nightly` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/6242?manifest_url=https://app.rerun.io/version/nightly/examples_manifest.json)
* [x] The PR title and labels are set such as to maximize their
usefulness for the next release's CHANGELOG
* [x] If applicable, add a new check to the [release
checklist](https://github.com/rerun-io/rerun/blob/main/tests/python/release_checklist)!

- [PR Build Summary](https://build.rerun.io/pr/6242)
- [Recent benchmark results](https://build.rerun.io/graphs/crates.html)
- [Wasm size tracking](https://build.rerun.io/graphs/sizes.html)

To run all checks from `main`, comment on the PR with `@rerun-bot
full-check`.
  • Loading branch information
jleibs authored May 7, 2024
1 parent 9032b01 commit 61193fd
Show file tree
Hide file tree
Showing 7 changed files with 438 additions and 3 deletions.
200 changes: 200 additions & 0 deletions crates/re_sdk/src/binary_stream_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
use std::sync::mpsc::{Receiver, Sender, SyncSender};
use std::sync::Arc;

use parking_lot::Mutex;

use re_log_types::LogMsg;

use crate::sink::LogSink;
use crate::RecordingStream;

/// Errors that can occur when creating a [`BinaryStreamSink`].
#[derive(thiserror::Error, Debug)]
pub enum BinaryStreamSinkError {
/// Error spawning the writer thread.
#[error("Failed to spawn thread: {0}")]
SpawnThread(std::io::Error),

/// Error encoding a log message.
#[error("Failed to encode LogMsg: {0}")]
LogMsgEncode(#[from] re_log_encoding::encoder::EncodeError),
}

enum Command {
Send(LogMsg),
Flush(SyncSender<()>),
}

impl Command {
fn flush() -> (Self, Receiver<()>) {
let (tx, rx) = std::sync::mpsc::sync_channel(0); // oneshot
(Self::Flush(tx), rx)
}
}

/// The inner storage used by [`BinaryStreamStorage`].
///
/// Although this implements Clone so that it can be shared between the encoder thread and the outer
/// storage, the model is that reading from it consumes the buffer.
#[derive(Clone, Default)]
struct BinaryStreamStorageInner(Arc<Mutex<std::io::Cursor<Vec<u8>>>>);

impl std::io::Write for BinaryStreamStorageInner {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.0.lock().write(buf)
}

fn flush(&mut self) -> std::io::Result<()> {
self.0.lock().flush()
}
}

/// The storage used by [`BinaryStreamSink`].
///
/// Reading from this consumes the bytes from the stream.
pub struct BinaryStreamStorage {
inner: BinaryStreamStorageInner,
rec: RecordingStream,
}

impl BinaryStreamStorage {
/// Create a new binary stream storage.
fn new(rec: RecordingStream) -> Self {
Self {
inner: Default::default(),
rec,
}
}

/// Read and consume the current contents of the buffer.
///
/// This does not flush the underlying batcher.
/// Use [`BinaryStreamStorage::flush`] if you want to guarantee that all
/// logged messages have been written to the stream before you read them.
#[inline]
pub fn read(&self) -> Vec<u8> {
let mut buffer = std::io::Cursor::new(Vec::new());
std::mem::swap(&mut buffer, &mut *self.inner.0.lock());
buffer.into_inner()
}

/// Flush the batcher and log encoder to guarantee that all logged messages
/// have been written to the stream.
///
/// This will block until the flush is complete.
#[inline]
pub fn flush(&self) {
self.rec.flush_blocking();
}
}

impl Drop for BinaryStreamStorage {
fn drop(&mut self) {
self.flush();
let bytes = self.read();

if !bytes.is_empty() {
re_log::warn!("Dropping data in BinaryStreamStorage");
}
}
}

/// Stream log messages to an in-memory binary stream.
///
/// The contents of this stream are encoded in the Rerun Record Data format (rrd).
///
/// This stream has no mechanism of limiting memory or creating back-pressure. If you do not
/// read from it, it will buffer all messages that you have logged.
pub struct BinaryStreamSink {
/// The sender to the encoder thread.
tx: Mutex<Sender<Option<Command>>>,

/// Handle to join the encoder thread on drop.
join_handle: Option<std::thread::JoinHandle<()>>,
}

impl Drop for BinaryStreamSink {
fn drop(&mut self) {
self.tx.lock().send(None).ok();
if let Some(join_handle) = self.join_handle.take() {
join_handle.join().ok();
}
}
}

impl BinaryStreamSink {
/// Create a pair of a new [`BinaryStreamSink`] and the associated [`BinaryStreamStorage`].
pub fn new(rec: RecordingStream) -> Result<(Self, BinaryStreamStorage), BinaryStreamSinkError> {
let storage = BinaryStreamStorage::new(rec);

// We always compress when writing to a stream
// TODO(jleibs): Make this configurable
let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED;

let (tx, rx) = std::sync::mpsc::channel();

let encoder =
re_log_encoding::encoder::Encoder::new(encoding_options, storage.inner.clone())?;

let join_handle = spawn_and_stream(encoder, rx)?;

Ok((
Self {
tx: tx.into(),
join_handle: Some(join_handle),
},
storage,
))
}
}

impl LogSink for BinaryStreamSink {
#[inline]
fn send(&self, msg: re_log_types::LogMsg) {
self.tx.lock().send(Some(Command::Send(msg))).ok();
}

#[inline]
fn flush_blocking(&self) {
let (cmd, oneshot) = Command::flush();
self.tx.lock().send(Some(cmd)).ok();
oneshot.recv().ok();
}
}

/// Spawn the encoder thread that will write log messages to the binary stream.
fn spawn_and_stream<W: std::io::Write + Send + 'static>(
mut encoder: re_log_encoding::encoder::Encoder<W>,
rx: Receiver<Option<Command>>,
) -> Result<std::thread::JoinHandle<()>, BinaryStreamSinkError> {
std::thread::Builder::new()
.name("binary_stream_encoder".into())
.spawn({
move || {
while let Ok(Some(cmd)) = rx.recv() {
match cmd {
Command::Send(log_msg) => {
if let Err(err) = encoder.append(&log_msg) {
re_log::error!(
"Failed to write log stream to binary stream: {err}"
);
return;
}
}
Command::Flush(oneshot) => {
re_log::trace!("Flushing…");
if let Err(err) = encoder.flush_blocking() {
re_log::error!(
"Failed to flush log stream to binary stream: {err}"
);
return;
}
drop(oneshot); // signals the oneshot
}
}
}
re_log::debug!("Log stream written to binary stream");
}
})
.map_err(BinaryStreamSinkError::SpawnThread)
}
4 changes: 4 additions & 0 deletions crates/re_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// ----------------
// Private modules:

mod binary_stream_sink;
mod global;
mod log_sink;
mod recording_stream;
Expand Down Expand Up @@ -60,6 +61,9 @@ impl crate::sink::LogSink for re_log_encoding::FileSink {
/// This is how you select whether the log stream ends up
/// sent over TCP, written to file, etc.
pub mod sink {
pub use crate::binary_stream_sink::{
BinaryStreamSink, BinaryStreamSinkError, BinaryStreamStorage,
};
pub use crate::log_sink::{BufferedSink, LogSink, MemorySink, MemorySinkStorage, TcpSink};

#[cfg(not(target_arch = "wasm32"))]
Expand Down
13 changes: 13 additions & 0 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use re_web_viewer_server::WebViewerServerPort;
#[cfg(feature = "web_viewer")]
use re_ws_comms::RerunServerPort;

use crate::binary_stream_sink::BinaryStreamStorage;
use crate::sink::{LogSink, MemorySinkStorage};

// ---
Expand Down Expand Up @@ -1595,6 +1596,18 @@ impl RecordingStream {
storage
}

/// Swaps the underlying sink for a [`crate::sink::BinaryStreamSink`] sink and returns the associated
/// [`BinaryStreamStorage`].
///
/// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
/// terms of data durability and ordering.
/// See [`Self::set_sink`] for more information.
pub fn binary_stream(&self) -> Result<BinaryStreamStorage, crate::sink::BinaryStreamSinkError> {
let (sink, storage) = crate::sink::BinaryStreamSink::new(self.clone())?;
self.set_sink(Box::new(sink));
Ok(storage)
}

/// Swaps the underlying sink for a [`crate::sink::FileSink`] at the specified `path`.
///
/// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
Expand Down
15 changes: 14 additions & 1 deletion rerun_py/rerun_sdk/rerun/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@
from .memory import MemoryRecording, memory_recording
from .notebook import notebook_show
from .recording_stream import (
BinaryStream,
RecordingStream,
binary_stream,
get_application_id,
get_data_recording,
get_global_data_recording,
Expand Down Expand Up @@ -128,7 +130,18 @@ def _init_recording_stream() -> None:
from rerun.recording_stream import _patch as recording_stream_patch

recording_stream_patch(
[connect, save, stdout, disconnect, memory_recording, serve, spawn, send_blueprint, notebook_show]
[
binary_stream,
connect,
save,
stdout,
disconnect,
memory_recording,
serve,
spawn,
send_blueprint,
notebook_show,
]
+ [
set_time_sequence,
set_time_seconds,
Expand Down
67 changes: 67 additions & 0 deletions rerun_py/rerun_sdk/rerun/recording_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,73 @@ def __del__(self): # type: ignore[no-untyped-def]
bindings.flush(blocking=False, recording=recording)


def binary_stream(recording: RecordingStream | None = None) -> BinaryStream:
"""
Sends all log-data to a [`rerun.BinaryStream`] object that can be read from.
The contents of this stream are encoded in the Rerun Record Data format (rrd).
This stream has no mechanism of limiting memory or creating back-pressure. If you do not
read from it, it will buffer all messages that you have logged.
Example
-------
```python
stream = rr.binary_stream()
rr.log("stream", rr.TextLog("Hello world"))
with open("output.rrd", "wb") as f:
f.write(stream.read())
```
Parameters
----------
recording:
Specifies the [`rerun.RecordingStream`][] to use.
If left unspecified, defaults to the current active data recording, if there is one.
See also: [`rerun.init`][], [`rerun.set_global_data_recording`][].
Returns
-------
BinaryStream
An object that can be used to flush or read the data.
"""

recording = RecordingStream.to_native(recording)
return BinaryStream(bindings.binary_stream(recording=recording))


class BinaryStream:
"""An encoded stream of bytes that can be saved as an rrd or sent to the viewer."""

def __init__(self, storage: bindings.PyMemorySinkStorage) -> None:
self.storage = storage

def read(self, *, flush: bool = True) -> bytes:
"""
Reads the available bytes from the stream.
If using `flush`, the read call will first block until the flush is complete.
Parameters
----------
flush:
If true (default), the stream will be flushed before reading.
"""
return self.storage.read(flush=flush) # type: ignore[no-any-return]

def flush(self) -> None:
"""
Flushes the recording stream and ensures that all logged messages have been encoded into the stream.
This will block until the flush is complete.
"""
self.storage.flush()


def _patch(funcs): # type: ignore[no-untyped-def]
"""Adds the given functions as methods to the `RecordingStream` class; injects `recording=self` in passing."""
import functools
Expand Down
Loading

0 comments on commit 61193fd

Please sign in to comment.