Skip to content

Commit

Permalink
Standard input/output support 2: Rust SDK stdout impl/examples/docs (#…
Browse files Browse the repository at this point in the history
…4512)

Allow the Rust SDK to stream RRD data to stdout.

Checks:
- [x] `just py-build && echo 'hello from stdin!' | cargo run -p stdio |
rerun -`

---

Part of a small PR series to add stdio streaming support to our Viewer
and SDKs:
- #4511
- #4512 
- #4513
- #4514
  • Loading branch information
teh-cmc authored Dec 14, 2023
1 parent 022dcde commit 6e49d84
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 33 deletions.
28 changes: 28 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/re_data_source/src/load_stdin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use re_smart_channel::Sender;
/// Asynchronously loads RRD data streaming in from standard input.
///
/// This fails synchronously iff the standard input stream could not be opened, otherwise errors
/// are handlded asynchronously (as in: they're logged).
/// are handled asynchronously (as in: they're logged).
pub fn load_stdin(tx: Sender<LogMsg>) -> anyhow::Result<()> {
let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;

Expand Down
99 changes: 67 additions & 32 deletions crates/re_log_encoding/src/file_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ pub struct FileSink {
join_handle: Option<std::thread::JoinHandle<()>>,

/// Only used for diagnostics, not for access after `new()`.
path: PathBuf,
///
/// `None` indicates stdout.
path: Option<PathBuf>,
}

impl Drop for FileSink {
Expand Down Expand Up @@ -73,40 +75,31 @@ impl FileSink {

let file = std::fs::File::create(&path)
.map_err(|err| FileSinkError::CreateFile(path.clone(), err))?;
let mut encoder = crate::encoder::Encoder::new(encoding_options, file)?;

let join_handle = std::thread::Builder::new()
.name("file_writer".into())
.spawn({
let path = path.clone();
move || {
while let Ok(Some(cmd)) = rx.recv() {
match cmd {
Command::Send(log_msg) => {
if let Err(err) = encoder.append(&log_msg) {
re_log::error!("Failed to save log stream to {path:?}: {err}");
return;
}
}
Command::Flush(oneshot) => {
re_log::trace!("Flushing…");
if let Err(err) = encoder.flush_blocking() {
re_log::error!("Failed to flush log stream to {path:?}: {err}");
return;
}
drop(oneshot); // signals the oneshot
}
}
}
re_log::debug!("Log stream saved to {path:?}");
}
})
.map_err(FileSinkError::SpawnThread)?;
let encoder = crate::encoder::Encoder::new(encoding_options, file)?;
let join_handle = spawn_and_stream(Some(&path), encoder, rx)?;

Ok(Self {
tx: tx.into(),
join_handle: Some(join_handle),
path: Some(path),
})
}

/// Start writing log messages to standard output.
pub fn stdout() -> Result<Self, FileSinkError> {
let encoding_options = crate::EncodingOptions::COMPRESSED;

let (tx, rx) = std::sync::mpsc::channel();

re_log::debug!("Writing to stdout…");

let encoder = crate::encoder::Encoder::new(encoding_options, std::io::stdout())?;
let join_handle = spawn_and_stream(None, encoder, rx)?;

Ok(Self {
tx: tx.into(),
join_handle: Some(join_handle),
path,
path: None,
})
}

Expand All @@ -123,10 +116,52 @@ impl FileSink {
}
}

/// Set `filepath` to `None` to stream to standard output.
fn spawn_and_stream<W: std::io::Write + Send + 'static>(
filepath: Option<&std::path::Path>,
mut encoder: crate::encoder::Encoder<W>,
rx: Receiver<Option<Command>>,
) -> Result<std::thread::JoinHandle<()>, FileSinkError> {
let (name, target) = if let Some(filepath) = filepath {
("file_writer", filepath.display().to_string())
} else {
("stdout_writer", "stdout".to_owned())
};
std::thread::Builder::new()
.name(name.into())
.spawn({
move || {
while let Ok(Some(cmd)) = rx.recv() {
match cmd {
Command::Send(log_msg) => {
if let Err(err) = encoder.append(&log_msg) {
re_log::error!("Failed to write log stream to {target}: {err}");
return;
}
}
Command::Flush(oneshot) => {
re_log::trace!("Flushing…");
if let Err(err) = encoder.flush_blocking() {
re_log::error!("Failed to flush log stream to {target}: {err}");
return;
}
drop(oneshot); // signals the oneshot
}
}
}
re_log::debug!("Log stream written to {target}");
}
})
.map_err(FileSinkError::SpawnThread)
}

impl fmt::Debug for FileSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FileSink")
.field("path", &self.path)
.field(
"path",
&self.path.as_ref().cloned().unwrap_or("stdout".into()),
)
.finish_non_exhaustive()
}
}
1 change: 1 addition & 0 deletions crates/re_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ re_sdk_comms = { workspace = true, features = ["client"] }
re_types_core.workspace = true

ahash.workspace = true
atty.workspace = true
crossbeam.workspace = true
document-features.workspace = true
once_cell.workspace = true
Expand Down
58 changes: 58 additions & 0 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,38 @@ impl RecordingStreamBuilder {
}
}

/// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to stdout.
///
/// If there isn't any listener at the other end of the pipe, the [`RecordingStream`] will
/// default back to `buffered` mode, in order not to break the user's terminal.
///
/// ## Example
///
/// ```no_run
/// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").stdout()?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// ```
#[cfg(not(target_arch = "wasm32"))]
pub fn stdout(self) -> RecordingStreamResult<RecordingStream> {
let is_stdout_listening = !atty::is(atty::Stream::Stdout);
if !is_stdout_listening {
return self.buffered();
}

let (enabled, store_info, batcher_config) = self.into_args();

if enabled {
RecordingStream::new(
store_info,
batcher_config,
Box::new(crate::sink::FileSink::stdout()?),
)
} else {
re_log::debug!("Rerun disabled - call to stdout() ignored");
Ok(RecordingStream::disabled())
}
}

/// Spawns a new Rerun Viewer process from an executable available in PATH, then creates a new
/// [`RecordingStream`] that is pre-configured to stream the data through to that viewer over TCP.
///
Expand Down Expand Up @@ -1323,6 +1355,32 @@ impl RecordingStream {
Ok(())
}

/// Swaps the underlying sink for a [`crate::sink::FileSink`] pointed at stdout.
///
/// If there isn't any listener at the other end of the pipe, the [`RecordingStream`] will
/// default back to `buffered` mode, in order not to break the user's terminal.
///
/// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
/// terms of data durability and ordering.
/// See [`Self::set_sink`] for more information.
pub fn stdout(&self) -> Result<(), crate::sink::FileSinkError> {
if forced_sink_path().is_some() {
re_log::debug!("Ignored setting new file since _RERUN_FORCE_SINK is set");
return Ok(());
}

let is_stdout_listening = !atty::is(atty::Stream::Stdout);
if !is_stdout_listening {
self.set_sink(Box::new(crate::log_sink::BufferedSink::new()));
return Ok(());
}

let sink = crate::sink::FileSink::stdout()?;
self.set_sink(Box::new(sink));

Ok(())
}

/// Swaps the underlying sink for a [`crate::sink::BufferedSink`].
///
/// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
Expand Down
12 changes: 12 additions & 0 deletions docs/content/reference/sdk-operating-modes.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ Use [`rr.save`](https://ref.rerun.io/docs/python/stable/common/initialization_fu
#### Rust
Use [`RecordingStream::save`](https://docs.rs/rerun/latest/rerun/struct.RecordingStream.html#method.save).


## Standard Input/Output

Streams all logging data to standard output, which can then be loaded by the Rerun Viewer by streaming it from standard input.

#### Rust

Use [`RecordingStream::stdout`](https://docs.rs/rerun/latest/rerun/struct.RecordingStream.html#method.stdout?speculative-link).

Check out our [dedicated example](https://github.com/rerun-io/rerun/tree/latest/examples/rust/stdio/src/main.rs?speculative-link).


## Adding the standard flags to your programs

We provide helpers for both Python & Rust to effortlessly add and properly handle all of these flags in your programs.
Expand Down
10 changes: 10 additions & 0 deletions examples/rust/stdio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "stdio"
version = "0.12.0-alpha.1+dev"
edition = "2021"
rust-version = "1.72"
license = "MIT OR Apache-2.0"
publish = false

[dependencies]
rerun = { path = "../../../crates/rerun" }
21 changes: 21 additions & 0 deletions examples/rust/stdio/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
title: Standard Input/Output example
python: https://github.com/rerun-io/rerun/tree/latest/examples/python/stdio/main.py?speculative-link
rust: https://github.com/rerun-io/rerun/tree/latest/examples/rust/stdio/src/main.rs?speculative-link
cpp: https://github.com/rerun-io/rerun/tree/latest/examples/cpp/stdio/main.cpp?speculative-link
thumbnail: https://static.rerun.io/stdio/25c5aba992d4c8b3861386d8d9539a4823dca117/480w.png
---

<picture>
<img src="https://static.rerun.io/stdio/25c5aba992d4c8b3861386d8d9539a4823dca117/full.png" alt="">
<source media="(max-width: 480px)" srcset="https://static.rerun.io/stdio/25c5aba992d4c8b3861386d8d9539a4823dca117/480w.png">
<source media="(max-width: 768px)" srcset="https://static.rerun.io/stdio/25c5aba992d4c8b3861386d8d9539a4823dca117/768w.png">
<source media="(max-width: 1024px)" srcset="https://static.rerun.io/stdio/25c5aba992d4c8b3861386d8d9539a4823dca117/1024w.png">
<source media="(max-width: 1200px)" srcset="https://static.rerun.io/stdio/25c5aba992d4c8b3861386d8d9539a4823dca117/1200w.png">
</picture>

Demonstrates how to log data to standard output with the Rerun SDK, and then visualize it from standard input with the Rerun Viewer.

```bash
echo 'hello from stdin!' | cargo run | rerun -
```
20 changes: 20 additions & 0 deletions examples/rust/stdio/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
//! Demonstrates how to log data to standard output with the Rerun SDK, and then visualize it
//! from standard input with the Rerun Viewer.
//!
//! Usage:
//! ```text
//! echo 'hello from stdin!' | cargo run | rerun -
//! ```

fn main() -> Result<(), Box<dyn std::error::Error>> {
let rec = rerun::RecordingStreamBuilder::new("rerun_example_stdio").stdout()?;

let input = std::io::stdin()
.lines()
.collect::<Result<Vec<_>, _>>()?
.join("\n");

rec.log("stdin", &rerun::TextDocument::new(input))?;

Ok(())
}

0 comments on commit 6e49d84

Please sign in to comment.