Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spawn via $PATH 1: Rust implementation #3996

Merged
merged 10 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/re_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
mod global;
mod log_sink;
mod recording_stream;
mod spawn;

#[cfg(feature = "log")]
mod log_integration;

// -------------
// Public items:

pub use spawn::{spawn, SpawnError, SpawnOptions};

pub use self::recording_stream::{
RecordingStream, RecordingStreamBuilder, RecordingStreamError, RecordingStreamResult,
};
Expand Down
148 changes: 148 additions & 0 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ pub enum RecordingStreamError {
err: std::io::Error,
},

/// Error spawning a Rerun Viewer process.
#[error(transparent)] // makes bubbling all the way up to main look nice
SpawnViewer(#[from] crate::SpawnError),

/// Failure to host a web viewer and/or Rerun server.
#[cfg(feature = "web_viewer")]
#[error(transparent)]
Expand Down Expand Up @@ -311,6 +315,67 @@ impl RecordingStreamBuilder {
}
}

/// Spawns a new Rerun Viewer process from an executable available in PATH, then creates a new
/// [`RecordingStream`] that is pre-configured to stream the data through to that viewer over TCP.
///
/// If a Rerun Viewer is already listening on this TCP port, the stream will be redirected to
/// that viewer instead of starting a new one.
///
/// `flush_timeout` is the minimum time the [`TcpSink`][`crate::log_sink::TcpSink`] will
/// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
/// call to `flush` to block indefinitely if a connection cannot be established.
///
/// ## Example
///
/// ```no_run
/// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").spawn(re_sdk::default_flush_timeout())?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// ```
pub fn spawn(
self,
flush_timeout: Option<std::time::Duration>,
) -> RecordingStreamResult<RecordingStream> {
self.spawn_opts(&Default::default(), flush_timeout)
}

/// Spawns a new Rerun Viewer process from an executable available in PATH, then creates a new
/// [`RecordingStream`] that is pre-configured to stream the data through to that viewer over TCP.
///
/// If a Rerun Viewer is already listening on this TCP port, the stream will be redirected to
/// that viewer instead of starting a new one.
///
/// The behavior of the spawned Viewer can be configured via `opts`.
/// If you're fine with the default behavior, refer to the simpler [`Self::spawn`].
///
/// `flush_timeout` is the minimum time the [`TcpSink`][`crate::log_sink::TcpSink`] will
/// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
/// call to `flush` to block indefinitely if a connection cannot be established.
///
/// ## Example
///
/// ```no_run
/// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app")
/// .spawn_opts(&re_sdk::SpawnOptions::default(), re_sdk::default_flush_timeout())?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// ```
pub fn spawn_opts(
self,
opts: &crate::SpawnOptions,
flush_timeout: Option<std::time::Duration>,
) -> RecordingStreamResult<RecordingStream> {
let connect_addr = opts.connect_addr();

// NOTE: If `_RERUN_TEST_FORCE_SAVE` is set, all recording streams will write to disk no matter
// what, thus spawning a viewer is pointless (and probably not intended).
if forced_sink_path().is_some() {
return self.connect(connect_addr, flush_timeout);
}

spawn(opts)?;

self.connect(connect_addr, flush_timeout)
}

/// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
/// web-based Rerun viewer via WebSockets.
///
Expand Down Expand Up @@ -1104,6 +1169,58 @@ impl RecordingStream {
self.set_sink(Box::new(crate::log_sink::TcpSink::new(addr, flush_timeout)));
}

/// Spawns a new Rerun Viewer process from an executable available in PATH, then swaps the
/// underlying sink for a [`crate::log_sink::TcpSink`] sink pre-configured to send data to that
/// new process.
///
/// If a Rerun Viewer is already listening on this TCP port, the stream will be redirected to
/// that viewer instead of starting a new one.
///
/// `flush_timeout` is the minimum time the [`TcpSink`][`crate::log_sink::TcpSink`] will
/// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
/// call to `flush` to block indefinitely if a connection cannot be established.
///
/// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
/// terms of data durability and ordering.
/// See [`Self::set_sink`] for more information.
pub fn spawn(&self, flush_timeout: Option<std::time::Duration>) -> RecordingStreamResult<()> {
self.spawn_opts(&Default::default(), flush_timeout)
}

/// Spawns a new Rerun Viewer process from an executable available in PATH, then swaps the
/// underlying sink for a [`crate::log_sink::TcpSink`] sink pre-configured to send data to that
/// new process.
///
/// If a Rerun Viewer is already listening on this TCP port, the stream will be redirected to
/// that viewer instead of starting a new one.
///
/// The behavior of the spawned Viewer can be configured via `opts`.
/// If you're fine with the default behavior, refer to the simpler [`Self::spawn`].
///
/// `flush_timeout` is the minimum time the [`TcpSink`][`crate::log_sink::TcpSink`] will
/// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
/// call to `flush` to block indefinitely if a connection cannot be established.
///
/// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
/// terms of data durability and ordering.
/// See [`Self::set_sink`] for more information.
pub fn spawn_opts(
&self,
opts: &crate::SpawnOptions,
flush_timeout: Option<std::time::Duration>,
) -> RecordingStreamResult<()> {
if forced_sink_path().is_some() {
re_log::debug!("Ignored setting new TcpSink since _RERUN_FORCE_SINK is set");
return Ok(());
}

spawn(opts)?;

self.connect(opts.connect_addr(), flush_timeout);

Ok(())
}
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved

/// Swaps the underlying sink for a [`crate::sink::MemorySink`] sink and returns the associated
/// [`MemorySinkStorage`].
///
Expand Down Expand Up @@ -1177,6 +1294,37 @@ impl fmt::Debug for RecordingStream {
}
}

/// Helper to deduplicate spawn logic across [`RecordingStreamBuilder`] & [`RecordingStream`].
fn spawn(opts: &crate::SpawnOptions) -> RecordingStreamResult<()> {
use std::{net::TcpStream, time::Duration};

let connect_addr = opts.connect_addr();

// TODO(#4019): application-level handshake
if TcpStream::connect_timeout(&connect_addr, Duration::from_secs(1)).is_ok() {
re_log::info!(
addr = %opts.listen_addr(),
"A process is already listening at this address. Trying to connect instead."
);
} else {
crate::spawn(opts)?;

// Give the newly spawned Rerun Viewer some time to bind.
//
// NOTE: The timeout only covers the TCP handshake: if no process is bound to that address
// at all, the connection will fail immediately, irrelevant of the timeout configuration.
// For that reason we use an extra loop.
for _ in 0..5 {
if TcpStream::connect_timeout(&connect_addr, Duration::from_secs(1)).is_ok() {
break;
}
std::thread::sleep(Duration::from_millis(100));
}
}

Ok(())
}

// --- Stateful time ---

/// Thread-local data.
Expand Down
190 changes: 190 additions & 0 deletions crates/re_sdk/src/spawn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/// Options to control the behavior of [`spawn`].
///
/// Refer to the field-level documentation for more information about each individual options.
///
/// The defaults are ok for most use cases: `SpawnOptions::default()`.
/// Use the partial-default pattern to customize them further:
/// ```no_run
/// let opts = re_sdk::SpawnOptions {
/// port: 1234,
/// memory_limit: "25%".into(),
/// ..Default::default()
/// };
/// ```
#[derive(Debug, Clone)]
pub struct SpawnOptions {
/// The port to listen on.
///
/// Defaults to `9876`.
pub port: u16,

/// An upper limit on how much memory the Rerun Viewer should use.
/// When this limit is reached, Rerun will drop the oldest data.
/// Example: `16GB` or `50%` (of system total).
///
/// Defaults to `75%`.
pub memory_limit: String,

/// Specifies the name of the Rerun executable.
///
/// You can omit the `.exe` suffix on Windows.
///
/// Defaults to `rerun`.
pub executable_name: String,

/// Enforce a specific executable to use instead of searching though PATH
/// for [`Self::executable_name`].
///
/// Unspecified by default.
pub executable_path: Option<String>,

/// Extra arguments that will be passed as-is to the Rerun Viewer process.
pub extra_args: Vec<String>,
}

// NOTE: No need for .exe extension on windows.
const RERUN_BINARY: &str = "rerun";

impl Default for SpawnOptions {
fn default() -> Self {
Self {
port: crate::default_server_addr().port(),
memory_limit: "75%".into(),
executable_name: RERUN_BINARY.into(),
executable_path: None,
extra_args: Vec::new(),
}
}
}

impl SpawnOptions {
/// Resolves the final connect address value.
pub fn connect_addr(&self) -> std::net::SocketAddr {
std::net::SocketAddr::new("127.0.0.1".parse().unwrap(), self.port)
}

/// Resolves the final listen address value.
pub fn listen_addr(&self) -> std::net::SocketAddr {
std::net::SocketAddr::new("0.0.0.0".parse().unwrap(), self.port)
}

/// Resolves the final executable path.
pub fn executable_path(&self) -> String {
if let Some(path) = self.executable_path.as_deref() {
return path.to_owned();
}

self.executable_name.clone()
}
}

/// Errors that can occur when [`spawn`]ing a Rerun Viewer.
#[derive(thiserror::Error)]
pub enum SpawnError {
/// Failed to find Rerun Viewer executable in PATH.
#[error("Failed to find Rerun Viewer executable in PATH.\n{message}\nPATH={search_path:?}")]
ExecutableNotFoundInPath {
/// High-level error message meant to be printed to the user (install tips etc).
message: String,

/// Name used for the executable search.
executable_name: String,

/// Value of the `PATH` environment variable, if any.
search_path: String,
},

/// Failed to find Rerun Viewer executable at explicit path.
#[error("Failed to find Rerun Viewer executable at {executable_path:?}")]
ExecutableNotFound {
/// Explicit path of the executable (specified by the caller).
executable_path: String,
},

/// Other I/O error.
#[error("Failed to spawn the Rerun Viewer process: {0}")]
Io(#[from] std::io::Error),
}

impl std::fmt::Debug for SpawnError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Due to how recording streams are initialized in practice, most of the time `SpawnError`s
// will bubble all the way up to `main` and crash the program, which will call into the
// `Debug` implementation.
//
// Spawn errors include a user guide, and so we need them to render in a nice way.
// Hence we redirect the debug impl to the display impl generated by `thiserror`.
<Self as std::fmt::Display>::fmt(self, f)
}
}

/// Spawns a new Rerun Viewer process ready to listen for TCP connections.
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
///
/// If there is already a process listening on this port (Rerun or not), this function returns `Ok`
/// WITHOUT spawning a `rerun` process (!).
///
/// Refer to [`SpawnOptions`]'s documentation for configuration options.
///
/// This only starts a Viewer process: if you'd like to connect to it and start sending data, refer
/// to [`crate::RecordingStream::connect`] or use [`crate::RecordingStream::spawn`] directly.
pub fn spawn(opts: &SpawnOptions) -> Result<(), SpawnError> {
use std::{net::TcpStream, process::Command, time::Duration};

// NOTE: It's indented on purpose, it just looks better and reads easier.
const EXECUTABLE_NOT_FOUND: &str = //
"
You can install binary releases of the Rerun Viewer:
* Using `cargo`: `cargo binstall rerun-cli` (see https://github.com/cargo-bins/cargo-binstall)
* Via direct download from our release assets: https://github.com/rerun-io/rerun/releases/latest/
* Using `pip`: `pip3 install rerun-sdk` (warning: pip version has slower start times!)

For more information, refer to our complete install documentation over at:
https://rerun.io/docs/getting-started/installing-viewer
";

let port = opts.port;
let connect_addr = opts.connect_addr();
let memory_limit = &opts.memory_limit;
let executable_path = opts.executable_path();

// TODO(#4019): application-level handshake
if TcpStream::connect_timeout(&connect_addr, Duration::from_secs(1)).is_ok() {
re_log::info!(
addr = %opts.listen_addr(),
"A process is already listening at this address. Assuming it's a Rerun Viewer."
);
return Ok(());
}

let res = Command::new(executable_path)
.arg(format!("--port={port}"))
.arg(format!("--memory-limit={memory_limit}"))
.arg("--skip-welcome-screen")
.args(opts.extra_args.clone())
.spawn();

let rerun_bin = match res {
Ok(rerun_bin) => rerun_bin,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
return if let Some(executable_path) = opts.executable_path.as_ref() {
Err(SpawnError::ExecutableNotFound {
executable_path: executable_path.clone(),
})
} else {
Err(SpawnError::ExecutableNotFoundInPath {
message: EXECUTABLE_NOT_FOUND.to_owned(),
executable_name: opts.executable_name.clone(),
search_path: std::env::var("PATH").unwrap_or_else(|_| String::new()),
})
}
}
Err(err) => {
return Err(err.into());
}
};

// Simply forget about the child process, we want it to outlive the parent process if needed.
_ = rerun_bin;

Ok(())
}
Loading
Loading