Skip to content

Commit

Permalink
Standard input/output support 1: stream RRD data from stdin (#4511)
Browse files Browse the repository at this point in the history
Add a `stdin` source to the viewer so it can stream data in from
standard input.

We make `stdin` an explicit input (`rerun -`) like in the good old days,
to avoid shady behavior in non-TTY and other esoteric environment (crazy
WSL setups, etc). Trust me, it gets quite insane...

Checks:
- [x] `rerun` works
- [x] `rerun -` blocks
- [x] `rerun - < data.rrd` works
- [x] `cat data.rrd | rerun -` works
- [x] `rerun < data.rrd` works but does nothing
- [x] `cat data.rrd | rerun` works but does nothing
- [x] no-TTY & other esoteric environments don't go completely off the
rails

---

Part of a small PR series to add stdio streaming support to our Viewer
and SDKs:
- #4511
- #4512 
- #4513
- #4514
  • Loading branch information
teh-cmc authored Dec 14, 2023
1 parent 0eaad73 commit 022dcde
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ array-init = "2.1"
arrow2 = "0.17"
arrow2_convert = "0.5.0"
async-executor = "1.0"
atty = "0.2"
backtrace = "0.3"
bincode = "1.3"
bitflags = { version = "2.4", features = ["bytemuck"] }
Expand Down
30 changes: 30 additions & 0 deletions crates/re_data_source/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ pub enum DataSource {

/// A remote Rerun server.
WebSocketAddr(String),

// RRD data streaming in from standard input.
#[cfg(not(target_arch = "wasm32"))]
Stdin,
}

impl DataSource {
Expand Down Expand Up @@ -65,6 +69,16 @@ impl DataSource {
}
}

// Reading from standard input in non-TTY environments (e.g. GitHub Actions, but I'm sure we can
// come up with more convoluted than that…) can lead to many unexpected,
// platform-specific problems that aren't even necessarily consistent across runs.
//
// In order to avoid having to swallow errors based on unreliable heuristics (or inversely:
// throwing errors when we shouldn't), we just make reading from standard input explicit.
if uri == "-" {
return DataSource::Stdin;
}

let path = std::path::Path::new(&uri).to_path_buf();

if uri.starts_with("file://") || path.exists() {
Expand Down Expand Up @@ -147,6 +161,22 @@ impl DataSource {
DataSource::WebSocketAddr(rerun_server_ws_url) => {
crate::web_sockets::connect_to_ws_url(&rerun_server_ws_url, on_msg)
}

#[cfg(not(target_arch = "wasm32"))]
DataSource::Stdin => {
let (tx, rx) = re_smart_channel::smart_channel(
SmartMessageSource::Stdin,
SmartChannelSource::Stdin,
);

crate::load_stdin::load_stdin(tx).with_context(|| "stdin".to_owned())?;

if let Some(on_msg) = on_msg {
on_msg();
}

Ok(rx)
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions crates/re_data_source/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ mod load_file;
mod load_file_contents;
mod web_sockets;

#[cfg(not(target_arch = "wasm32"))]
mod load_stdin;

#[cfg(not(target_arch = "wasm32"))]
mod load_file_path;

Expand Down
33 changes: 33 additions & 0 deletions crates/re_data_source/src/load_stdin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use re_log_types::LogMsg;
use re_smart_channel::Sender;

/// Asynchronously loads RRD data streaming in from standard input.
///
/// This fails synchronously iff the standard input stream could not be opened, otherwise errors
/// are handlded asynchronously (as in: they're logged).
pub fn load_stdin(tx: Sender<LogMsg>) -> anyhow::Result<()> {
let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;

let decoder = re_log_encoding::decoder::Decoder::new(version_policy, std::io::stdin())?;

rayon::spawn(move || {
re_tracing::profile_scope!("stdin");

for msg in decoder {
let msg = match msg {
Ok(msg) => msg,
Err(err) => {
re_log::warn_once!("Failed to decode message in stdin: {err}");
continue;
}
};
if tx.send(msg).is_err() {
break; // The other end has decided to hang up, not our problem.
}
}

tx.quit(None).ok(); // The other end has decided to hang up, not our problem.
});

Ok(())
}
9 changes: 8 additions & 1 deletion crates/re_smart_channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,15 @@ pub enum SmartChannelSource {
///
/// We are a TCP server listening on this port.
TcpServer { port: u16 },

/// The channel was created in the context of streaming in RRD data from standard input.
Stdin,
}

impl SmartChannelSource {
pub fn is_network(&self) -> bool {
match self {
Self::File(_) | Self::Sdk | Self::RrdWebEventListener => false,
Self::File(_) | Self::Sdk | Self::RrdWebEventListener | Self::Stdin => false,
Self::RrdHttpStream { .. } | Self::WsClient { .. } | Self::TcpServer { .. } => true,
}
}
Expand Down Expand Up @@ -103,6 +106,9 @@ pub enum SmartMessageSource {
// reason.
addr: Option<std::net::SocketAddr>,
},

/// The data is streaming in from standard input.
Stdin,
}

impl std::fmt::Display for SmartMessageSource {
Expand All @@ -118,6 +124,7 @@ impl std::fmt::Display for SmartMessageSource {
"tcp://{}",
addr.map_or_else(|| "(unknown ip)".to_owned(), |addr| addr.to_string())
),
SmartMessageSource::Stdin => "stdin".into(),
})
}
}
Expand Down
4 changes: 3 additions & 1 deletion crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,9 @@ impl App {
for source in sources {
match &*source {
// No need for a welcome screen - data is coming soon!
SmartChannelSource::File(_) | SmartChannelSource::RrdHttpStream { .. } => {
SmartChannelSource::File(_)
| SmartChannelSource::RrdHttpStream { .. }
| SmartChannelSource::Stdin => {
return false;
}

Expand Down
3 changes: 2 additions & 1 deletion crates/re_viewer/src/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,8 @@ fn recording_config_entry<'cfgs>(
// Live data - follow it!
re_smart_channel::SmartChannelSource::Sdk
| re_smart_channel::SmartChannelSource::WsClient { .. }
| re_smart_channel::SmartChannelSource::TcpServer { .. } => PlayState::Following,
| re_smart_channel::SmartChannelSource::TcpServer { .. }
| re_smart_channel::SmartChannelSource::Stdin => PlayState::Following,
}
} else {
PlayState::Following // No known source 🤷‍♂️
Expand Down
4 changes: 3 additions & 1 deletion crates/re_viewer/src/ui/recordings_panel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ fn loading_receivers_ui(
SmartChannelSource::RrdWebEventListener
| SmartChannelSource::Sdk
| SmartChannelSource::WsClient { .. }
| SmartChannelSource::TcpServer { .. } => {
| SmartChannelSource::TcpServer { .. }
| SmartChannelSource::Stdin => {
// These show up in the top panel - see `top_panel.rs`.
continue;
}
Expand Down Expand Up @@ -294,6 +295,7 @@ fn data_source_string(data_source: &re_smart_channel::SmartChannelSource) -> Str
SmartChannelSource::Sdk => "SDK".to_owned(),
SmartChannelSource::WsClient { ws_server_url } => ws_server_url.clone(),
SmartChannelSource::TcpServer { port } => format!("TCP Server, port {port}"),
SmartChannelSource::Stdin => "Standard Input".to_owned(),
}
}

Expand Down
6 changes: 5 additions & 1 deletion crates/re_viewer/src/ui/top_panel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ fn connection_status_ui(ui: &mut egui::Ui, rx: &ReceiveSet<re_log_types::LogMsg>
.into_iter()
.filter(|source| {
match source.as_ref() {
SmartChannelSource::File(_) | SmartChannelSource::RrdHttpStream { .. } => {
SmartChannelSource::File(_)
| SmartChannelSource::RrdHttpStream { .. }
| SmartChannelSource::Stdin => {
false // These show up in the recordings panel as a "Loading…" in `recordings_panel.rs`
}

Expand Down Expand Up @@ -135,6 +137,7 @@ fn connection_status_ui(ui: &mut egui::Ui, rx: &ReceiveSet<re_log_types::LogMsg>

let tooltip = match source {
SmartChannelSource::File(_)
| SmartChannelSource::Stdin
| SmartChannelSource::RrdHttpStream { .. }
| SmartChannelSource::RrdWebEventListener
| SmartChannelSource::Sdk
Expand All @@ -157,6 +160,7 @@ fn connection_status_ui(ui: &mut egui::Ui, rx: &ReceiveSet<re_log_types::LogMsg>
re_smart_channel::SmartChannelSource::File(path) => {
format!("Loading {}…", path.display())
}
re_smart_channel::SmartChannelSource::Stdin => "Loading stdin…".to_owned(),
re_smart_channel::SmartChannelSource::RrdHttpStream { url } => {
format!("Loading {url}…")
}
Expand Down
1 change: 1 addition & 0 deletions crates/re_viewer/src/viewer_analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ impl ViewerAnalytics {
re_smart_channel::SmartChannelSource::Sdk => "sdk", // show()
re_smart_channel::SmartChannelSource::WsClient { .. } => "ws_client", // spawn()
re_smart_channel::SmartChannelSource::TcpServer { .. } => "tcp_server", // connect()
re_smart_channel::SmartChannelSource::Stdin => "stdin",
};
self.register("data_source", data_source);
}
Expand Down
1 change: 1 addition & 0 deletions crates/rerun/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ pub mod external {
pub use anyhow;

pub use re_build_info;
pub use re_data_source;
pub use re_data_store;
pub use re_data_store::external::*;
pub use re_format;
Expand Down

0 comments on commit 022dcde

Please sign in to comment.