Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spawn via $PATH 1: Rust implementation #3996

Merged
merged 10 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/re_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
mod global;
mod log_sink;
mod recording_stream;
mod spawn;

#[cfg(feature = "log")]
mod log_integration;

// -------------
// Public items:

pub use spawn::{spawn, SpawnOptions};

pub use self::recording_stream::{
RecordingStream, RecordingStreamBuilder, RecordingStreamError, RecordingStreamResult,
};
Expand Down
100 changes: 100 additions & 0 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ pub enum RecordingStreamError {
err: std::io::Error,
},

/// Error spawning a Rerun Viewer process.
#[error("Failed to spawn Rerun Viewer process: {0}")]
SpawnViewer(std::io::Error),

/// Failure to host a web viewer and/or Rerun server.
#[cfg(feature = "web_viewer")]
#[error(transparent)]
Expand Down Expand Up @@ -311,6 +315,55 @@ impl RecordingStreamBuilder {
}
}

/// Spawns a new Rerun Viewer process from an executable available in PATH, then creates a new
/// [`RecordingStream`] that is pre-configured to stream the data through to that viewer over TCP.
///
/// `flush_timeout` is the minimum time the [`TcpSink`][`crate::log_sink::TcpSink`] will
/// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
/// call to `flush` to block indefinitely if a connection cannot be established.
///
/// ## Example
///
/// ```no_run
/// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app")
/// .spawn(&re_sdk::SpawnOptions::default(), re_sdk::default_flush_timeout())?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// ```
pub fn spawn(
self,
opts: &crate::SpawnOptions,
flush_timeout: Option<std::time::Duration>,
) -> RecordingStreamResult<RecordingStream> {
use std::{net::TcpStream, time::Duration};

let connect_addr = opts.connect_addr();

// NOTE: If `_RERUN_TEST_FORCE_SAVE` is set, all recording streams will write to disk no matter
// what, thus spawning a viewer is pointless (and probably not intended).
if forced_sink_path().is_some() {
return self.connect(connect_addr, flush_timeout);
}

if TcpStream::connect_timeout(&connect_addr, Duration::from_millis(1)).is_ok() {
re_log::info!(
addr = %opts.listen_addr(),
"A process is already listening at this address. Trying to connect instead."
);
} else {
crate::spawn(opts).map_err(RecordingStreamError::SpawnViewer)?;

// Give the newly spawned Rerun Viewer some time to bind.
for _ in 0..5 {
if TcpStream::connect_timeout(&connect_addr, Duration::from_millis(1)).is_ok() {
break;
}
std::thread::sleep(Duration::from_millis(100));
}
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
}

self.connect(connect_addr, flush_timeout)
}

/// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
/// web-based Rerun viewer via WebSockets.
///
Expand Down Expand Up @@ -1104,6 +1157,53 @@ impl RecordingStream {
self.set_sink(Box::new(crate::log_sink::TcpSink::new(addr, flush_timeout)));
}

/// Spawns a new Rerun Viewer process from an executable available in PATH, then swaps the
/// underlying sink for a [`crate::log_sink::TcpSink`] sink pre-configured to send data to that
/// new process.
///
/// `flush_timeout` is the minimum time the [`TcpSink`][`crate::log_sink::TcpSink`] will
/// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
/// call to `flush` to block indefinitely if a connection cannot be established.
///
/// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
/// terms of data durability and ordering.
/// See [`Self::set_sink`] for more information.
pub fn spawn(
&self,
opts: &crate::SpawnOptions,
flush_timeout: Option<std::time::Duration>,
) -> RecordingStreamResult<()> {
use std::{net::TcpStream, time::Duration};

if forced_sink_path().is_some() {
re_log::debug!("Ignored setting new TcpSink since _RERUN_FORCE_SINK is set");
return Ok(());
}

let connect_addr = opts.connect_addr();

if TcpStream::connect_timeout(&connect_addr, Duration::from_millis(1)).is_ok() {
re_log::info!(
addr = %opts.listen_addr(),
"A process is already listening at this address. Trying to connect instead."
);
} else {
crate::spawn(opts).map_err(RecordingStreamError::SpawnViewer)?;

// Give the newly spawned Rerun Viewer some time to bind.
for _ in 0..5 {
if TcpStream::connect_timeout(&connect_addr, Duration::from_millis(1)).is_ok() {
break;
}
std::thread::sleep(Duration::from_millis(100));
}
}

self.connect(connect_addr, flush_timeout);

Ok(())
}
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved

/// Swaps the underlying sink for a [`crate::sink::MemorySink`] sink and returns the associated
/// [`MemorySinkStorage`].
///
Expand Down
166 changes: 166 additions & 0 deletions crates/re_sdk/src/spawn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/// Options to control the behavior of [`spawn`].
///
/// Refer to the field-level documentation for more information about each individual options.
///
/// The defaults are ok for most use cases: `SpawnOptions::default()`.
/// Use the builder pattern to customize them further:
/// ```no_run
/// let opts = re_sdk::SpawnOptions::default().with_port(1234u16).with_memory_limit("25%");
/// ```
#[derive(Debug, Clone, Default)]
pub struct SpawnOptions {
/// The port to listen on.
///
/// Defaults to `9876` if unspecified.
pub port: Option<u16>,
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved

/// An upper limit on how much memory the Rerun Viewer should use.
/// When this limit is reached, Rerun will drop the oldest data.
/// Example: `16GB` or `50%` (of system total).
///
/// Defaults to `75%` if unspecified.
pub memory_limit: Option<String>,
emilk marked this conversation as resolved.
Show resolved Hide resolved

/// Specifies the name of the Rerun executable.
///
/// You can omit the `.exe` suffix on Windows.
///
/// Defaults to `rerun` if unspecified.
pub executable_name: Option<String>,
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved

/// Enforce a specific executable to use instead of searching though PATH
/// for [`Self::executable_name`].
///
/// Unspecified by default.
pub executable_path: Option<String>,
}

impl SpawnOptions {
/// Refer to field-level documentation.
pub fn with_port(mut self, port: impl Into<u16>) -> Self {
self.port = Some(port.into());
self
}

/// Refer to field-level documentation.
pub fn with_memory_limit(mut self, memory_limit: impl AsRef<str>) -> Self {
self.memory_limit = Some(memory_limit.as_ref().to_owned());
self
}

/// Refer to field-level documentation.
pub fn with_executable_name(mut self, executable_name: impl AsRef<str>) -> Self {
self.executable_name = Some(executable_name.as_ref().to_owned());
self
}

/// Refer to field-level documentation.
pub fn with_executable_path(mut self, executable_path: impl AsRef<str>) -> Self {
self.executable_path = Some(executable_path.as_ref().to_owned());
self
}
}

impl SpawnOptions {
/// Resolves the final port value.
pub fn port(&self) -> u16 {
self.port.unwrap_or(9876)
}

/// Resolves the final connect address value.
pub fn connect_addr(&self) -> std::net::SocketAddr {
std::net::SocketAddr::new("127.0.0.1".parse().unwrap(), self.port())
}

/// Resolves the final listen address value.
pub fn listen_addr(&self) -> std::net::SocketAddr {
std::net::SocketAddr::new("0.0.0.0".parse().unwrap(), self.port())
}

/// Resolves the final memory limit value.
pub fn memory_limit(&self) -> String {
self.memory_limit.as_deref().unwrap_or("75%").to_owned()
}

/// Resolves the final executable path.
pub fn executable_path(&self) -> String {
// NOTE: No need for .exe extension on windows.
const RERUN_BINARY: &str = "rerun";

if let Some(path) = self.executable_path.as_deref() {
return path.to_owned();
}

self.executable_name
.as_deref()
.unwrap_or(RERUN_BINARY)
.to_owned()
}
}

/// Spawns a new Rerun Viewer process ready to listen for TCP connections.
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
///
/// Refer to [`SpawnOptions`]'s documentation for configuration options.
///
/// This only starts a Viewer process: if you'd like to connect to it and start sending data, refer
/// to [`crate::RecordingStream::connect`] or use [`crate::RecordingStream::spawn`] directly.
pub fn spawn(opts: &SpawnOptions) -> std::io::Result<()> {
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
use std::{net::TcpStream, process::Command, time::Duration};

// NOTE: It's indented on purpose, it just looks better and reads easier.
const EXECUTABLE_NOT_FOUND: &str = //
"
Couldn't find the Rerun Viewer executable in your PATH.

You can install binary releases of the Rerun Viewer using any of the following methods:
* Binary download with `cargo`: `cargo binstall rerun-cli` (see https://github.com/cargo-bins/cargo-binstall)
* Build from source with `cargo`: `cargo install rerun-cli` (requires Rust 1.72+)
* Direct download from our release assets: https://github.com/rerun-io/rerun/releases/latest/
* Or together with the Rerun Python SDK:
* Pip: `pip3 install rerun-sdk`
* Conda: `conda install -c conda-forge rerun-sdk`
* Binary download with `pixi`: `pixi global install rerun-sdk` (see https://prefix.dev/docs/pixi/overview)
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved

If your platform and/or architecture is not available, you can refer to
https://github.com/rerun-io/rerun/blob/main/BUILD.md for instructions on how to build from source.

Otherwise, feel free to open an issue at https://github.com/rerun-io/rerun/issues if you'd like to
request binary releases for your specific platform.
";

let port = opts.port();
let connect_addr = opts.connect_addr();
let memory_limit = opts.memory_limit();
let executable_path = opts.executable_path();

if TcpStream::connect_timeout(&connect_addr, Duration::from_millis(1)).is_ok() {
re_log::info!(
addr = %opts.listen_addr(),
"A process is already listening at this address. Assuming it's a Rerun Viewer."
);
return Ok(());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To return Ok when perhaps there is a completely unrelated (non-rerun) process already occupying the port seems like trouble waiting to happen…

To me it makes sense it it is a special error (SpawnError::AddressAlreadyInUse) and have the caller do the assuming instead, but it's not a blocker. At least the logging is loud and clear!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've used the same trick in the Python SDK for months without issue.
I'll make an issue to introduce a proper Rerun handshake, but this is out of the scope of this PR.


let res = Command::new(executable_path)
.arg(format!("--port={port}"))
.arg(format!("--memory-limit={memory_limit}"))
.arg("--skip-welcome-screen")
.spawn();

let rerun_bin = match res {
Ok(rerun_bin) => rerun_bin,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
eprintln!("{EXECUTABLE_NOT_FOUND}");
return Err(err);
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
}
Err(err) => {
re_log::info!(%err, "Failed to spawn Rerun Viewer");
return Err(err);
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
}
};

// Simply forget about the child process, we want it to outlive the parent process if needed.
_ = rerun_bin;

Ok(())
}
6 changes: 2 additions & 4 deletions crates/re_types/src/archetypes/annotation_context.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/re_types/src/archetypes/arrows3d.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions crates/re_types/src/archetypes/asset3d.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/re_types/src/archetypes/bar_chart.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading