Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a new BinaryStreamSink that allows reading a stream of encoded bytes #6242

Merged
merged 12 commits into from
May 7, 2024
200 changes: 200 additions & 0 deletions crates/re_sdk/src/binary_stream_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
use std::sync::mpsc::{Receiver, Sender, SyncSender};
use std::sync::Arc;

use parking_lot::Mutex;

use re_log_types::LogMsg;

use crate::sink::LogSink;
use crate::RecordingStream;

/// Errors that can occur when creating a [`BinaryStreamSink`].
#[derive(thiserror::Error, Debug)]
pub enum BinaryStreamSinkError {
/// Error spawning the writer thread.
#[error("Failed to spawn thread: {0}")]
SpawnThread(std::io::Error),

/// Error encoding a log message.
#[error("Failed to encode LogMsg: {0}")]
LogMsgEncode(#[from] re_log_encoding::encoder::EncodeError),
}

enum Command {
Send(LogMsg),
Flush(SyncSender<()>),
}

impl Command {
fn flush() -> (Self, Receiver<()>) {
let (tx, rx) = std::sync::mpsc::sync_channel(0); // oneshot
(Self::Flush(tx), rx)
}
}
Comment on lines +28 to +33
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I found 3 implementations of this, two of them using crossbeam one using mpsc like you here. I like that you bring balance to the world ;)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good enough for FileSink, good enough for me 🤷


/// The inner storage used by [`BinaryStreamStorage`].
///
/// Although this implements Clone so that it can be shared between the encoder thread and the outer
/// storage, the model is that reading from it consumes the buffer.
#[derive(Clone, Default)]
struct BinaryStreamStorageInner(Arc<Mutex<std::io::Cursor<Vec<u8>>>>);

impl std::io::Write for BinaryStreamStorageInner {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.0.lock().write(buf)
}

fn flush(&mut self) -> std::io::Result<()> {
self.0.lock().flush()
}
}

/// The storage used by [`BinaryStreamSink`].
///
/// Reading from this consumes the bytes from the stream.
pub struct BinaryStreamStorage {
inner: BinaryStreamStorageInner,
rec: RecordingStream,
}

impl BinaryStreamStorage {
/// Create a new binary stream storage.
fn new(rec: RecordingStream) -> Self {
Self {
inner: Default::default(),
rec,
}
}

/// Read and consume the current contents of the buffer.
///
/// This does not flush the underlying batcher.
/// Use [`BinaryStreamStorage::flush`] if you want to guarantee that all
/// logged messages have been written to the stream before you read them.
#[inline]
pub fn read(&self) -> Vec<u8> {
let mut buffer = std::io::Cursor::new(Vec::new());
std::mem::swap(&mut buffer, &mut *self.inner.0.lock());
buffer.into_inner()
}

/// Flush the batcher and log encoder to guarantee that all logged messages
/// have been written to the stream.
///
/// This will block until the flush is complete.
#[inline]
pub fn flush(&self) {
self.rec.flush_blocking();
}
}

impl Drop for BinaryStreamStorage {
fn drop(&mut self) {
self.flush();
let bytes = self.read();

if !bytes.is_empty() {
re_log::warn!("Dropping data in BinaryStreamStorage");
}
}
}

/// Stream log messages to an in-memory binary stream.
///
/// The contents of this stream are encoded in the Rerun Record Data format (rrd).
///
/// This stream has no mechanism of limiting memory or creating back-pressure. If you do not
/// read from it, it will buffer all messages that you have logged.
pub struct BinaryStreamSink {
/// The sender to the encoder thread.
tx: Mutex<Sender<Option<Command>>>,

/// Handle to join the encoder thread on drop.
join_handle: Option<std::thread::JoinHandle<()>>,
}

impl Drop for BinaryStreamSink {
fn drop(&mut self) {
self.tx.lock().send(None).ok();
if let Some(join_handle) = self.join_handle.take() {
join_handle.join().ok();
}
}
}

impl BinaryStreamSink {
/// Create a pair of a new [`BinaryStreamSink`] and the associated [`BinaryStreamStorage`].
pub fn new(rec: RecordingStream) -> Result<(Self, BinaryStreamStorage), BinaryStreamSinkError> {
let storage = BinaryStreamStorage::new(rec);

// We always compress when writing to a stream
// TODO(jleibs): Make this configurable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any harm in just exposing that as a flag on creation right away?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly that (1) it also applies to every other sink type and (2) I can't come up with a good reason that a user would ever set it to False.

I'd rather introduce them all together in a consistent way if and when we have a compelling use-case.

let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED;

let (tx, rx) = std::sync::mpsc::channel();

let encoder =
re_log_encoding::encoder::Encoder::new(encoding_options, storage.inner.clone())?;

let join_handle = spawn_and_stream(encoder, rx)?;

Ok((
Self {
tx: tx.into(),
join_handle: Some(join_handle),
},
storage,
))
}
}

impl LogSink for BinaryStreamSink {
#[inline]
fn send(&self, msg: re_log_types::LogMsg) {
self.tx.lock().send(Some(Command::Send(msg))).ok();
}

#[inline]
fn flush_blocking(&self) {
let (cmd, oneshot) = Command::flush();
self.tx.lock().send(Some(cmd)).ok();
oneshot.recv().ok();
}
}

/// Spawn the encoder thread that will write log messages to the binary stream.
fn spawn_and_stream<W: std::io::Write + Send + 'static>(
mut encoder: re_log_encoding::encoder::Encoder<W>,
rx: Receiver<Option<Command>>,
) -> Result<std::thread::JoinHandle<()>, BinaryStreamSinkError> {
std::thread::Builder::new()
.name("binary_stream_encoder".into())
.spawn({
move || {
while let Ok(Some(cmd)) = rx.recv() {
match cmd {
Command::Send(log_msg) => {
if let Err(err) = encoder.append(&log_msg) {
re_log::error!(
"Failed to write log stream to binary stream: {err}"
);
return;
}
}
Command::Flush(oneshot) => {
re_log::trace!("Flushing…");
if let Err(err) = encoder.flush_blocking() {
re_log::error!(
"Failed to flush log stream to binary stream: {err}"
);
return;
}
drop(oneshot); // signals the oneshot
}
}
}
re_log::debug!("Log stream written to binary stream");
}
})
.map_err(BinaryStreamSinkError::SpawnThread)
}
4 changes: 4 additions & 0 deletions crates/re_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// ----------------
// Private modules:

mod binary_stream_sink;
mod global;
mod log_sink;
mod recording_stream;
Expand Down Expand Up @@ -60,6 +61,9 @@ impl crate::sink::LogSink for re_log_encoding::FileSink {
/// This is how you select whether the log stream ends up
/// sent over TCP, written to file, etc.
pub mod sink {
pub use crate::binary_stream_sink::{
BinaryStreamSink, BinaryStreamSinkError, BinaryStreamStorage,
};
pub use crate::log_sink::{BufferedSink, LogSink, MemorySink, MemorySinkStorage, TcpSink};

#[cfg(not(target_arch = "wasm32"))]
Expand Down
13 changes: 13 additions & 0 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use re_web_viewer_server::WebViewerServerPort;
#[cfg(feature = "web_viewer")]
use re_ws_comms::RerunServerPort;

use crate::binary_stream_sink::BinaryStreamStorage;
use crate::sink::{LogSink, MemorySinkStorage};

// ---
Expand Down Expand Up @@ -1595,6 +1596,18 @@ impl RecordingStream {
storage
}

/// Swaps the underlying sink for a [`crate::sink::BinaryStreamSink`] sink and returns the associated
/// [`BinaryStreamStorage`].
///
/// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
/// terms of data durability and ordering.
/// See [`Self::set_sink`] for more information.
pub fn binary_stream(&self) -> Result<BinaryStreamStorage, crate::sink::BinaryStreamSinkError> {
let (sink, storage) = crate::sink::BinaryStreamSink::new(self.clone())?;
self.set_sink(Box::new(sink));
Ok(storage)
}

/// Swaps the underlying sink for a [`crate::sink::FileSink`] at the specified `path`.
///
/// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
Expand Down
15 changes: 14 additions & 1 deletion rerun_py/rerun_sdk/rerun/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@
from .memory import MemoryRecording, memory_recording
from .notebook import notebook_show
from .recording_stream import (
BinaryStream,
RecordingStream,
binary_stream,
get_application_id,
get_data_recording,
get_global_data_recording,
Expand Down Expand Up @@ -127,7 +129,18 @@ def _init_recording_stream() -> None:
from rerun.recording_stream import _patch as recording_stream_patch

recording_stream_patch(
[connect, save, stdout, disconnect, memory_recording, serve, spawn, send_blueprint, notebook_show]
[
binary_stream,
connect,
save,
stdout,
disconnect,
memory_recording,
serve,
spawn,
send_blueprint,
notebook_show,
]
+ [
set_time_sequence,
set_time_seconds,
Expand Down
67 changes: 67 additions & 0 deletions rerun_py/rerun_sdk/rerun/recording_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,73 @@ def __del__(self): # type: ignore[no-untyped-def]
bindings.flush(blocking=False, recording=recording)


def binary_stream(recording: RecordingStream | None = None) -> BinaryStream:
"""
Sends all log-data to a [`rerun.BinaryStream`] object that can be read from.

The contents of this stream are encoded in the Rerun Record Data format (rrd).

This stream has no mechanism of limiting memory or creating back-pressure. If you do not
read from it, it will buffer all messages that you have logged.

Example
-------
```python
stream = rr.binary_stream()

rr.log("stream", rr.TextLog("Hello world"))

with open("output.rrd", "wb") as f:
f.write(stream.read())
```

Parameters
----------
recording:
Specifies the [`rerun.RecordingStream`][] to use.
If left unspecified, defaults to the current active data recording, if there is one.
See also: [`rerun.init`][], [`rerun.set_global_data_recording`][].

Returns
-------
BinaryStream
An object that can be used to flush or read the data.

"""

recording = RecordingStream.to_native(recording)
return BinaryStream(bindings.binary_stream(recording=recording))


class BinaryStream:
"""An encoded stream of bytes that can be saved as an rrd or sent to the viewer."""

def __init__(self, storage: bindings.PyMemorySinkStorage) -> None:
self.storage = storage

def read(self, *, flush: bool = True) -> bytes:
"""
Reads the available bytes from the stream.

If using `flush`, the read call will first block until the flush is complete.

Parameters
----------
flush:
If true (default), the stream will be flushed before reading.

"""
return self.storage.read(flush=flush) # type: ignore[no-any-return]

def flush(self) -> None:
"""
Flushes the recording stream and ensures that all logged messages have been encoded into the stream.

This will block until the flush is complete.
"""
self.storage.flush()


def _patch(funcs): # type: ignore[no-untyped-def]
"""Adds the given functions as methods to the `RecordingStream` class; injects `recording=self` in passing."""
import functools
Expand Down
Loading
Loading