diff --git a/Cargo.lock b/Cargo.lock index b9a04f1dcee7..e5c320456326 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -653,6 +653,17 @@ dependencies = [ "zbus", ] +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi 0.1.19", + "libc", + "winapi", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -2532,6 +2543,15 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + [[package]] name = "hermit-abi" version = "0.2.6" @@ -4768,6 +4788,7 @@ version = "0.12.0-alpha.1+dev" dependencies = [ "ahash 0.8.6", "anyhow", + "atty", "crossbeam", "document-features", "itertools 0.11.0", @@ -6145,6 +6166,13 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "stdio" +version = "0.12.0-alpha.1+dev" +dependencies = [ + "rerun", +] + [[package]] name = "str-buf" version = "1.0.6" diff --git a/crates/re_data_source/src/load_stdin.rs b/crates/re_data_source/src/load_stdin.rs index 5cf6eb9c6836..01064498ef35 100644 --- a/crates/re_data_source/src/load_stdin.rs +++ b/crates/re_data_source/src/load_stdin.rs @@ -4,7 +4,7 @@ use re_smart_channel::Sender; /// Asynchronously loads RRD data streaming in from standard input. /// /// This fails synchronously iff the standard input stream could not be opened, otherwise errors -/// are handlded asynchronously (as in: they're logged). +/// are handled asynchronously (as in: they're logged). pub fn load_stdin(tx: Sender) -> anyhow::Result<()> { let version_policy = re_log_encoding::decoder::VersionPolicy::Warn; diff --git a/crates/re_log_encoding/src/file_sink.rs b/crates/re_log_encoding/src/file_sink.rs index dfc721e247bc..38bc0b802064 100644 --- a/crates/re_log_encoding/src/file_sink.rs +++ b/crates/re_log_encoding/src/file_sink.rs @@ -43,7 +43,9 @@ pub struct FileSink { join_handle: Option>, /// Only used for diagnostics, not for access after `new()`. - path: PathBuf, + /// + /// `None` indicates stdout. + path: Option, } impl Drop for FileSink { @@ -73,40 +75,31 @@ impl FileSink { let file = std::fs::File::create(&path) .map_err(|err| FileSinkError::CreateFile(path.clone(), err))?; - let mut encoder = crate::encoder::Encoder::new(encoding_options, file)?; - - let join_handle = std::thread::Builder::new() - .name("file_writer".into()) - .spawn({ - let path = path.clone(); - move || { - while let Ok(Some(cmd)) = rx.recv() { - match cmd { - Command::Send(log_msg) => { - if let Err(err) = encoder.append(&log_msg) { - re_log::error!("Failed to save log stream to {path:?}: {err}"); - return; - } - } - Command::Flush(oneshot) => { - re_log::trace!("Flushing…"); - if let Err(err) = encoder.flush_blocking() { - re_log::error!("Failed to flush log stream to {path:?}: {err}"); - return; - } - drop(oneshot); // signals the oneshot - } - } - } - re_log::debug!("Log stream saved to {path:?}"); - } - }) - .map_err(FileSinkError::SpawnThread)?; + let encoder = crate::encoder::Encoder::new(encoding_options, file)?; + let join_handle = spawn_and_stream(Some(&path), encoder, rx)?; + + Ok(Self { + tx: tx.into(), + join_handle: Some(join_handle), + path: Some(path), + }) + } + + /// Start writing log messages to standard output. + pub fn stdout() -> Result { + let encoding_options = crate::EncodingOptions::COMPRESSED; + + let (tx, rx) = std::sync::mpsc::channel(); + + re_log::debug!("Writing to stdout…"); + + let encoder = crate::encoder::Encoder::new(encoding_options, std::io::stdout())?; + let join_handle = spawn_and_stream(None, encoder, rx)?; Ok(Self { tx: tx.into(), join_handle: Some(join_handle), - path, + path: None, }) } @@ -123,10 +116,52 @@ impl FileSink { } } +/// Set `filepath` to `None` to stream to standard output. +fn spawn_and_stream( + filepath: Option<&std::path::Path>, + mut encoder: crate::encoder::Encoder, + rx: Receiver>, +) -> Result, FileSinkError> { + let (name, target) = if let Some(filepath) = filepath { + ("file_writer", filepath.display().to_string()) + } else { + ("stdout_writer", "stdout".to_owned()) + }; + std::thread::Builder::new() + .name(name.into()) + .spawn({ + move || { + while let Ok(Some(cmd)) = rx.recv() { + match cmd { + Command::Send(log_msg) => { + if let Err(err) = encoder.append(&log_msg) { + re_log::error!("Failed to write log stream to {target}: {err}"); + return; + } + } + Command::Flush(oneshot) => { + re_log::trace!("Flushing…"); + if let Err(err) = encoder.flush_blocking() { + re_log::error!("Failed to flush log stream to {target}: {err}"); + return; + } + drop(oneshot); // signals the oneshot + } + } + } + re_log::debug!("Log stream written to {target}"); + } + }) + .map_err(FileSinkError::SpawnThread) +} + impl fmt::Debug for FileSink { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FileSink") - .field("path", &self.path) + .field( + "path", + &self.path.as_ref().cloned().unwrap_or("stdout".into()), + ) .finish_non_exhaustive() } } diff --git a/crates/re_sdk/Cargo.toml b/crates/re_sdk/Cargo.toml index 66fddc4d2ebd..a035a3f97f29 100644 --- a/crates/re_sdk/Cargo.toml +++ b/crates/re_sdk/Cargo.toml @@ -51,6 +51,7 @@ re_sdk_comms = { workspace = true, features = ["client"] } re_types_core.workspace = true ahash.workspace = true +atty.workspace = true crossbeam.workspace = true document-features.workspace = true once_cell.workspace = true diff --git a/crates/re_sdk/src/recording_stream.rs b/crates/re_sdk/src/recording_stream.rs index 9110c70e33ad..1b8f3b00fb89 100644 --- a/crates/re_sdk/src/recording_stream.rs +++ b/crates/re_sdk/src/recording_stream.rs @@ -354,6 +354,38 @@ impl RecordingStreamBuilder { } } + /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to stdout. + /// + /// If there isn't any listener at the other end of the pipe, the [`RecordingStream`] will + /// default back to `buffered` mode, in order not to break the user's terminal. + /// + /// ## Example + /// + /// ```no_run + /// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").stdout()?; + /// # Ok::<(), Box>(()) + /// ``` + #[cfg(not(target_arch = "wasm32"))] + pub fn stdout(self) -> RecordingStreamResult { + let is_stdout_listening = !atty::is(atty::Stream::Stdout); + if !is_stdout_listening { + return self.buffered(); + } + + let (enabled, store_info, batcher_config) = self.into_args(); + + if enabled { + RecordingStream::new( + store_info, + batcher_config, + Box::new(crate::sink::FileSink::stdout()?), + ) + } else { + re_log::debug!("Rerun disabled - call to stdout() ignored"); + Ok(RecordingStream::disabled()) + } + } + /// Spawns a new Rerun Viewer process from an executable available in PATH, then creates a new /// [`RecordingStream`] that is pre-configured to stream the data through to that viewer over TCP. /// @@ -1323,6 +1355,32 @@ impl RecordingStream { Ok(()) } + /// Swaps the underlying sink for a [`crate::sink::FileSink`] pointed at stdout. + /// + /// If there isn't any listener at the other end of the pipe, the [`RecordingStream`] will + /// default back to `buffered` mode, in order not to break the user's terminal. + /// + /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in + /// terms of data durability and ordering. + /// See [`Self::set_sink`] for more information. + pub fn stdout(&self) -> Result<(), crate::sink::FileSinkError> { + if forced_sink_path().is_some() { + re_log::debug!("Ignored setting new file since _RERUN_FORCE_SINK is set"); + return Ok(()); + } + + let is_stdout_listening = !atty::is(atty::Stream::Stdout); + if !is_stdout_listening { + self.set_sink(Box::new(crate::log_sink::BufferedSink::new())); + return Ok(()); + } + + let sink = crate::sink::FileSink::stdout()?; + self.set_sink(Box::new(sink)); + + Ok(()) + } + /// Swaps the underlying sink for a [`crate::sink::BufferedSink`]. /// /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in diff --git a/docs/content/reference/sdk-operating-modes.md b/docs/content/reference/sdk-operating-modes.md index 7aa8cd2098ab..b8f543b7162c 100644 --- a/docs/content/reference/sdk-operating-modes.md +++ b/docs/content/reference/sdk-operating-modes.md @@ -75,6 +75,18 @@ Use [`rr.save`](https://ref.rerun.io/docs/python/stable/common/initialization_fu #### Rust Use [`RecordingStream::save`](https://docs.rs/rerun/latest/rerun/struct.RecordingStream.html#method.save). + +## Standard Input/Output + +Streams all logging data to standard output, which can then be loaded by the Rerun Viewer by streaming it from standard input. + +#### Rust + +Use [`RecordingStream::stdout`](https://docs.rs/rerun/latest/rerun/struct.RecordingStream.html#method.stdout?speculative-link). + +Check out our [dedicated example](https://github.com/rerun-io/rerun/tree/latest/examples/rust/stdio/src/main.rs?speculative-link). + + ## Adding the standard flags to your programs We provide helpers for both Python & Rust to effortlessly add and properly handle all of these flags in your programs. diff --git a/examples/rust/stdio/Cargo.toml b/examples/rust/stdio/Cargo.toml new file mode 100644 index 000000000000..5d77461e6a7b --- /dev/null +++ b/examples/rust/stdio/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "stdio" +version = "0.12.0-alpha.1+dev" +edition = "2021" +rust-version = "1.72" +license = "MIT OR Apache-2.0" +publish = false + +[dependencies] +rerun = { path = "../../../crates/rerun" } diff --git a/examples/rust/stdio/README.md b/examples/rust/stdio/README.md new file mode 100644 index 000000000000..01332218b8d9 --- /dev/null +++ b/examples/rust/stdio/README.md @@ -0,0 +1,21 @@ +--- +title: Standard Input/Output example +python: https://github.com/rerun-io/rerun/tree/latest/examples/python/stdio/main.py?speculative-link +rust: https://github.com/rerun-io/rerun/tree/latest/examples/rust/stdio/src/main.rs?speculative-link +cpp: https://github.com/rerun-io/rerun/tree/latest/examples/cpp/stdio/main.cpp?speculative-link +thumbnail: https://static.rerun.io/stdio/25c5aba992d4c8b3861386d8d9539a4823dca117/480w.png +--- + + + + + + + + + +Demonstrates how to log data to standard output with the Rerun SDK, and then visualize it from standard input with the Rerun Viewer. + +```bash +echo 'hello from stdin!' | cargo run | rerun - +``` diff --git a/examples/rust/stdio/src/main.rs b/examples/rust/stdio/src/main.rs new file mode 100644 index 000000000000..887a208f1b16 --- /dev/null +++ b/examples/rust/stdio/src/main.rs @@ -0,0 +1,20 @@ +//! Demonstrates how to log data to standard output with the Rerun SDK, and then visualize it +//! from standard input with the Rerun Viewer. +//! +//! Usage: +//! ```text +//! echo 'hello from stdin!' | cargo run | rerun - +//! ``` + +fn main() -> Result<(), Box> { + let rec = rerun::RecordingStreamBuilder::new("rerun_example_stdio").stdout()?; + + let input = std::io::stdin() + .lines() + .collect::, _>>()? + .join("\n"); + + rec.log("stdin", &rerun::TextDocument::new(input))?; + + Ok(()) +}