Skip to content

Commit

Permalink
Spawn via $PATH 2: Redesign clap integration and clean up all exa…
Browse files Browse the repository at this point in the history
…mples (#3997)

- Get rid of the old thread-based `spawn` functionality.
- Redesign `RerunArgs` to get rid of the awful callback while still
dealing with Tokio's TLS shenanigans.
- Update all tests and examples.

The new `RerunArgs` combined with the new `spawn` from PATH now make for
a pretty nice experience:
```rust
let args = Args::parse();
let (rec, _serve_guard) = args.rerun.init("my_app")?;
// do stuff with rec
```

---

Spawn via `$PATH` series:
- #3996
- #3997
- #3998

---

- Fixes #2109
  • Loading branch information
teh-cmc authored Oct 26, 2023
1 parent c7d26a1 commit c1a91d7
Show file tree
Hide file tree
Showing 38 changed files with 136 additions and 353 deletions.
5 changes: 2 additions & 3 deletions crates/re_types/src/archetypes/text_document.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/rerun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ re_web_viewer_server = { workspace = true, optional = true }
clap = { workspace = true, features = ["derive"] }
puffin.workspace = true
rayon.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "time"] }


[build-dependencies]
Expand Down
133 changes: 63 additions & 70 deletions crates/rerun/src/clap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use std::{net::SocketAddr, path::PathBuf};

use re_sdk::RecordingStream;
use re_sdk::{RecordingStream, RecordingStreamBuilder};

// ---

Expand All @@ -15,13 +15,9 @@ enum RerunBehavior {
#[cfg(feature = "web_viewer")]
Serve,

#[cfg(feature = "native_viewer")]
Spawn,
}

// TODO(cmc): There are definitely ways of making this all nicer now (this, native_viewer and
// web_viewer).. but one thing at a time.

/// This struct implements a `clap::Parser` that defines all the arguments that a typical Rerun
/// application might use, and provides helpers to evaluate those arguments and behave
/// consequently.
Expand All @@ -43,7 +39,7 @@ enum RerunBehavior {
#[derive(Clone, Debug, clap::Args)]
#[clap(author, version, about)]
pub struct RerunArgs {
/// Start a viewer and feed it data in real-time.
/// Start a new Rerun Viewer process and feed it data in real-time.
#[clap(long, default_value = "true")]
spawn: bool,

Expand All @@ -68,81 +64,82 @@ pub struct RerunArgs {
bind: String,
}

/// [`RerunArgs::init`] might have to spawn a bunch of background tasks depending on what arguments
/// were passed in.
/// This object makes sure they live long enough and get polled as needed.
#[doc(hidden)]
#[derive(Default)]
pub struct ServeGuard {
tokio_rt: Option<tokio::runtime::Runtime>,
}

impl Drop for ServeGuard {
fn drop(&mut self) {
if let Some(tokio_rt) = self.tokio_rt.take() {
eprintln!("Sleeping indefinitely while serving web viewer... Press ^C when done.");
tokio_rt.block_on(async {
tokio::time::sleep(std::time::Duration::from_secs(u64::MAX)).await;
});
}
}
}

impl RerunArgs {
/// Set up Rerun, and run the given code with a [`RecordingStream`] object
/// that can be used to log data.
///
/// Logging will be controlled by the `RERUN` environment variable,
/// or the `default_enabled` argument if the environment variable is not set.
/// Creates a new [`RecordingStream`] according to the CLI parameters.
#[track_caller] // track_caller so that we can see if we are being called from an official example.
pub fn run(
&self,
application_id: &str,
default_enabled: bool,
run: impl FnOnce(RecordingStream) + Send + 'static,
) -> anyhow::Result<()> {
// Ensure we have a running tokio runtime.
let mut tokio_runtime = None;
let tokio_runtime_handle = if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle
} else {
let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
tokio_runtime.get_or_insert(rt).handle().clone()
};
let _tokio_runtime_guard = tokio_runtime_handle.enter();

let (rerun_enabled, store_info, batcher_config) =
crate::RecordingStreamBuilder::new(application_id)
.default_enabled(default_enabled)
.into_args();

if !rerun_enabled {
run(RecordingStream::disabled());
return Ok(());
}
pub fn init(&self, application_id: &str) -> anyhow::Result<(RecordingStream, ServeGuard)> {
match self.to_behavior()? {
RerunBehavior::Connect(addr) => Ok((
RecordingStreamBuilder::new(application_id)
.connect(addr, crate::default_flush_timeout())?,
Default::default(),
)),

let sink: Box<dyn re_sdk::sink::LogSink> = match self.to_behavior()? {
RerunBehavior::Connect(addr) => Box::new(crate::sink::TcpSink::new(
addr,
crate::default_flush_timeout(),
RerunBehavior::Save(path) => Ok((
RecordingStreamBuilder::new(application_id).save(path)?,
Default::default(),
)),

RerunBehavior::Save(path) => Box::new(crate::sink::FileSink::new(path)?),
RerunBehavior::Spawn => Ok((
RecordingStreamBuilder::new(application_id).spawn(crate::default_flush_timeout())?,
Default::default(),
)),

#[cfg(feature = "web_viewer")]
RerunBehavior::Serve => {
let mut tokio_rt = None;

// Get the Tokio runtime for the current thread, or create one if there isn't any.
// If we do create one, we'll have to make sure it both outlives and gets
// polled to completion as we return from this method!
let tokio_rt_handle = if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle
} else {
tokio_rt
.get_or_insert(tokio::runtime::Runtime::new()?)
.handle()
.clone()
};

// Creating the actual web sink and associated servers will require the current
// thread to be in a Tokio context.
let _tokio_rt_guard = tokio_rt_handle.enter();

let open_browser = true;
re_sdk::web_viewer::new_sink(
open_browser,
let rec = RecordingStreamBuilder::new(application_id).serve(
&self.bind,
Default::default(),
Default::default(),
)?
}

#[cfg(feature = "native_viewer")]
RerunBehavior::Spawn => {
crate::native_viewer::spawn(store_info, batcher_config, run)?;
return Ok(());
}
};

let rec = RecordingStream::new(store_info, batcher_config, sink)?;
run(rec.clone());
open_browser,
)?;

// The user callback is done executing, it's a good opportunity to flush the pipeline
// independently of the current flush thresholds (which might be `NEVER`).
rec.flush_async();
// If we had to create a Tokio runtime from scratch, make sure it outlives this
// method and gets polled to completion.
let sleep_guard = ServeGuard { tokio_rt };

#[cfg(feature = "web_viewer")]
if matches!(self.to_behavior(), Ok(RerunBehavior::Serve)) {
// Sleep waiting for Ctrl-C:
tokio_runtime_handle.block_on(async {
tokio::time::sleep(std::time::Duration::from_secs(u64::MAX)).await;
});
Ok((rec, sleep_guard))
}
}

Ok(())
}

#[allow(clippy::unnecessary_wraps)] // False positive on some feature flags
Expand All @@ -162,10 +159,6 @@ impl RerunArgs {
None => {}
}

#[cfg(not(feature = "native_viewer"))]
anyhow::bail!("Expected --save, --connect, or --serve");

#[cfg(feature = "native_viewer")]
Ok(RerunBehavior::Spawn)
}
}
72 changes: 0 additions & 72 deletions crates/rerun/src/native_viewer.rs
Original file line number Diff line number Diff line change
@@ -1,58 +1,4 @@
use re_log_types::LogMsg;
use re_log_types::StoreInfo;
use re_sdk::RecordingStream;

/// Starts a Rerun viewer on the current thread and migrates the given callback, along with
/// the active `RecordingStream`, to a newly spawned thread where the callback will run until
/// completion.
///
/// All messages logged from the passed-in callback will be streamed to the viewer in
/// real-time.
///
/// The method will return when the viewer is closed.
///
/// ⚠️ This function must be called from the main thread since some platforms require that
/// their UI runs on the main thread! ⚠️
#[cfg(not(target_arch = "wasm32"))]
pub fn spawn<F>(
store_info: StoreInfo,
batcher_config: re_log_types::DataTableBatcherConfig,
run: F,
) -> re_viewer::external::eframe::Result<()>
where
F: FnOnce(RecordingStream) + Send + 'static,
{
let (tx, rx) = re_smart_channel::smart_channel(
re_smart_channel::SmartMessageSource::Sdk,
re_smart_channel::SmartChannelSource::Sdk,
);
let sink = Box::new(NativeViewerSink(tx));
let app_env = re_viewer::AppEnvironment::from_store_source(&store_info.store_source);

let rec =
RecordingStream::new(store_info, batcher_config, sink).expect("Failed to spawn thread");

// NOTE: Forget the handle on purpose, leave that thread be.
std::thread::Builder::new()
.name("spawned".into())
.spawn(move || run(rec))
.expect("Failed to spawn thread");

// NOTE: Some platforms still mandate that the UI must run on the main thread, so make sure
// to spawn the viewer in place and migrate the user callback to a new thread.
re_viewer::run_native_app(Box::new(move |cc, re_ui| {
let startup_options = re_viewer::StartupOptions::default();
let mut app = re_viewer::App::new(
re_build_info::build_info!(),
&app_env,
startup_options,
re_ui,
cc.storage,
);
app.add_receiver(rx);
Box::new(app)
}))
}

/// Starts a Rerun viewer to visualize the contents of a given array of messages.
/// The method will return when the viewer is closed.
Expand All @@ -78,21 +24,3 @@ pub fn show(msgs: Vec<LogMsg>) -> re_viewer::external::eframe::Result<()> {
msgs,
)
}

// ----------------------------------------------------------------------------

/// Stream log messages to a native viewer on the main thread.
#[cfg(feature = "native_viewer")]
struct NativeViewerSink(pub re_smart_channel::Sender<LogMsg>);

#[cfg(feature = "native_viewer")]
impl re_sdk::sink::LogSink for NativeViewerSink {
fn send(&self, msg: LogMsg) {
if let Err(err) = self.0.send(msg) {
re_log::error_once!("Failed to send log message to viewer: {err}");
}
}

#[inline]
fn flush_blocking(&self) {}
}
5 changes: 1 addition & 4 deletions examples/rust/clock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ license = "MIT OR Apache-2.0"
publish = false

[dependencies]
rerun = { path = "../../../crates/rerun", features = [
"native_viewer",
"web_viewer",
] }
rerun = { path = "../../../crates/rerun", features = ["web_viewer"] }

anyhow = "1.0"
clap = { version = "4.0", features = ["derive"] }
8 changes: 2 additions & 6 deletions examples/rust/clock/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,8 @@ fn main() -> anyhow::Result<()> {
use clap::Parser as _;
let args = Args::parse();

let default_enabled = true;
args.rerun
.clone()
.run("rerun_example_clock", default_enabled, move |rec| {
run(&rec, &args).unwrap();
})
let (rec, _serve_guard) = args.rerun.init("rerun_example_clock")?;
run(&rec, &args)
}

fn run(rec: &rerun::RecordingStream, args: &Args) -> anyhow::Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion examples/rust/dna/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const NUM_POINTS: usize = 100;

fn main() -> Result<(), Box<dyn std::error::Error>> {
let rec = rerun::RecordingStreamBuilder::new("rerun_example_dna_abacus")
.connect(rerun::default_server_addr(), rerun::default_flush_timeout())?;
.spawn(rerun::default_flush_timeout())?;

let (points1, colors1) = color_spiral(NUM_POINTS, 2.0, 0.02, 0.0, 0.1);
let (points2, colors2) = color_spiral(NUM_POINTS, 2.0, 0.02, TAU * 0.5, 0.1);
Expand Down
2 changes: 1 addition & 1 deletion examples/rust/minimal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ license = "MIT OR Apache-2.0"
publish = false

[dependencies]
rerun = { path = "../../../crates/rerun", features = ["native_viewer"] }
rerun = { path = "../../../crates/rerun" }
4 changes: 2 additions & 2 deletions examples/rust/minimal/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
use rerun::{demo_util::grid, external::glam};

fn main() -> Result<(), Box<dyn std::error::Error>> {
let (rec, storage) = rerun::RecordingStreamBuilder::new("rerun_example_minimal_rs").memory()?;
let rec = rerun::RecordingStreamBuilder::new("rerun_example_minimal")
.spawn(rerun::default_flush_timeout())?;

let points = grid(glam::Vec3::splat(-10.0), glam::Vec3::splat(10.0), 10);
let colors = grid(glam::Vec3::ZERO, glam::Vec3::splat(255.0), 10)
Expand All @@ -16,6 +17,5 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_radii([0.5]),
)?;

rerun::native_viewer::show(storage.take())?;
Ok(())
}
5 changes: 1 addition & 4 deletions examples/rust/minimal_options/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ license = "MIT OR Apache-2.0"
publish = false

[dependencies]
rerun = { path = "../../../crates/rerun", features = [
"native_viewer",
"web_viewer",
] }
rerun = { path = "../../../crates/rerun", features = ["web_viewer"] }

anyhow = "1.0"
clap = { version = "4.0", features = ["derive"] }
Expand Down
10 changes: 2 additions & 8 deletions examples/rust/minimal_options/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,8 @@ fn main() -> anyhow::Result<()> {
use clap::Parser as _;
let args = Args::parse();

let default_enabled = true;
args.rerun.clone().run(
"rerun_example_minimal_options",
default_enabled,
move |rec| {
run(&rec, &args).unwrap();
},
)
let (rec, _serve_guard) = args.rerun.init("rerun_example_minimal_options")?;
run(&rec, &args)
}

fn run(rec: &rerun::RecordingStream, args: &Args) -> anyhow::Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion examples/rust/minimal_serve/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let _guard = rt.enter();

let open_browser = true;
let rec = rerun::RecordingStreamBuilder::new("rerun_example_minimal_serve_rs").serve(
let rec = rerun::RecordingStreamBuilder::new("rerun_example_minimal_serve").serve(
"0.0.0.0",
Default::default(),
Default::default(),
Expand Down
Loading

0 comments on commit c1a91d7

Please sign in to comment.