Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spawn via $PATH 2: Redesign clap integration and clean up all examples #3997

Merged
merged 7 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions crates/re_types/src/archetypes/text_document.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/rerun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ re_web_viewer_server = { workspace = true, optional = true }
clap = { workspace = true, features = ["derive"] }
puffin.workspace = true
rayon.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "time"] }


[build-dependencies]
Expand Down
133 changes: 63 additions & 70 deletions crates/rerun/src/clap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::{net::SocketAddr, path::PathBuf};

use re_sdk::RecordingStream;
use re_sdk::{RecordingStream, RecordingStreamBuilder};

// ---

Expand All @@ -15,13 +15,9 @@ enum RerunBehavior {
#[cfg(feature = "web_viewer")]
Serve,

#[cfg(feature = "native_viewer")]
Spawn,
}

// TODO(cmc): There are definitely ways of making this all nicer now (this, native_viewer and
// web_viewer).. but one thing at a time.

/// This struct implements a `clap::Parser` that defines all the arguments that a typical Rerun
/// application might use, and provides helpers to evaluate those arguments and behave
/// consequently.
Expand All @@ -43,7 +39,7 @@ enum RerunBehavior {
#[derive(Clone, Debug, clap::Args)]
#[clap(author, version, about)]
pub struct RerunArgs {
/// Start a viewer and feed it data in real-time.
/// Start a new Rerun Viewer process and feed it data in real-time.
#[clap(long, default_value = "true")]
spawn: bool,

Expand All @@ -68,81 +64,82 @@ pub struct RerunArgs {
bind: String,
}

/// [`RerunArgs::init`] might have to spawn a bunch of background tasks depending on what arguments
/// were passed in.
/// This object makes sure they live long enough and get polled as needed.
#[doc(hidden)]
#[derive(Default)]
pub struct ServeGuard {
tokio_rt: Option<tokio::runtime::Runtime>,
}

impl Drop for ServeGuard {
fn drop(&mut self) {
if let Some(tokio_rt) = self.tokio_rt.take() {
eprintln!("Sleeping indefinitely while serving web viewer... Press ^C when done.");
tokio_rt.block_on(async {
tokio::time::sleep(std::time::Duration::from_secs(u64::MAX)).await;
});
}
}
}

impl RerunArgs {
/// Set up Rerun, and run the given code with a [`RecordingStream`] object
/// that can be used to log data.
///
/// Logging will be controlled by the `RERUN` environment variable,
/// or the `default_enabled` argument if the environment variable is not set.
/// Creates a new [`RecordingStream`] according to the CLI parameters.
#[track_caller] // track_caller so that we can see if we are being called from an official example.
pub fn run(
&self,
application_id: &str,
default_enabled: bool,
run: impl FnOnce(RecordingStream) + Send + 'static,
) -> anyhow::Result<()> {
// Ensure we have a running tokio runtime.
let mut tokio_runtime = None;
let tokio_runtime_handle = if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle
} else {
let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
tokio_runtime.get_or_insert(rt).handle().clone()
};
let _tokio_runtime_guard = tokio_runtime_handle.enter();

let (rerun_enabled, store_info, batcher_config) =
crate::RecordingStreamBuilder::new(application_id)
.default_enabled(default_enabled)
.into_args();

if !rerun_enabled {
run(RecordingStream::disabled());
return Ok(());
}
pub fn init(&self, application_id: &str) -> anyhow::Result<(RecordingStream, ServeGuard)> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ergonomics of getting a tuple back where one part is super important and useful and the other an odd token I have to care around (but only when doing serve) are a bit odd. Even worse if I can init with serve from a scope now, it will not exit that scope iff I do serve (since the ServeGuard gets dropped). That's quite surprising behavior.
Suggestion for possible alternative: Given that the serve guard can't be killed, we might as well put it on a thread_local variable instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried a lot of things, including putting the guard in a thread local, but the maybe-owned tokio runtime just makes things weird no matter what...

We do want the user to be able to drop the guard, it's the only to shutdown all the web stuff that we've spawned in the background :/

match self.to_behavior()? {
RerunBehavior::Connect(addr) => Ok((
RecordingStreamBuilder::new(application_id)
.connect(addr, crate::default_flush_timeout())?,
Default::default(),
)),

let sink: Box<dyn re_sdk::sink::LogSink> = match self.to_behavior()? {
RerunBehavior::Connect(addr) => Box::new(crate::sink::TcpSink::new(
addr,
crate::default_flush_timeout(),
RerunBehavior::Save(path) => Ok((
RecordingStreamBuilder::new(application_id).save(path)?,
Default::default(),
)),

RerunBehavior::Save(path) => Box::new(crate::sink::FileSink::new(path)?),
RerunBehavior::Spawn => Ok((
RecordingStreamBuilder::new(application_id).spawn(crate::default_flush_timeout())?,
Default::default(),
)),

#[cfg(feature = "web_viewer")]
RerunBehavior::Serve => {
let mut tokio_rt = None;

// Get the Tokio runtime for the current thread, or create one if there isn't any.
// If we do create one, we'll have to make sure it both outlives and gets
// polled to completion as we return from this method!
let tokio_rt_handle = if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle
} else {
tokio_rt
.get_or_insert(tokio::runtime::Runtime::new()?)
.handle()
.clone()
};

// Creating the actual web sink and associated servers will require the current
// thread to be in a Tokio context.
let _tokio_rt_guard = tokio_rt_handle.enter();

let open_browser = true;
re_sdk::web_viewer::new_sink(
open_browser,
let rec = RecordingStreamBuilder::new(application_id).serve(
&self.bind,
Default::default(),
Default::default(),
)?
}

#[cfg(feature = "native_viewer")]
RerunBehavior::Spawn => {
crate::native_viewer::spawn(store_info, batcher_config, run)?;
return Ok(());
}
};

let rec = RecordingStream::new(store_info, batcher_config, sink)?;
run(rec.clone());
open_browser,
)?;

// The user callback is done executing, it's a good opportunity to flush the pipeline
// independently of the current flush thresholds (which might be `NEVER`).
rec.flush_async();
// If we had to create a Tokio runtime from scratch, make sure it outlives this
// method and gets polled to completion.
let sleep_guard = ServeGuard { tokio_rt };

#[cfg(feature = "web_viewer")]
if matches!(self.to_behavior(), Ok(RerunBehavior::Serve)) {
// Sleep waiting for Ctrl-C:
tokio_runtime_handle.block_on(async {
tokio::time::sleep(std::time::Duration::from_secs(u64::MAX)).await;
});
Ok((rec, sleep_guard))
}
}

Ok(())
}

#[allow(clippy::unnecessary_wraps)] // False positive on some feature flags
Expand All @@ -162,10 +159,6 @@ impl RerunArgs {
None => {}
}

#[cfg(not(feature = "native_viewer"))]
anyhow::bail!("Expected --save, --connect, or --serve");

#[cfg(feature = "native_viewer")]
Ok(RerunBehavior::Spawn)
}
}
72 changes: 0 additions & 72 deletions crates/rerun/src/native_viewer.rs
Original file line number Diff line number Diff line change
@@ -1,58 +1,4 @@
use re_log_types::LogMsg;
use re_log_types::StoreInfo;
use re_sdk::RecordingStream;

/// Starts a Rerun viewer on the current thread and migrates the given callback, along with
/// the active `RecordingStream`, to a newly spawned thread where the callback will run until
/// completion.
///
/// All messages logged from the passed-in callback will be streamed to the viewer in
/// real-time.
///
/// The method will return when the viewer is closed.
///
/// ⚠️ This function must be called from the main thread since some platforms require that
/// their UI runs on the main thread! ⚠️
#[cfg(not(target_arch = "wasm32"))]
pub fn spawn<F>(
store_info: StoreInfo,
batcher_config: re_log_types::DataTableBatcherConfig,
run: F,
) -> re_viewer::external::eframe::Result<()>
where
F: FnOnce(RecordingStream) + Send + 'static,
{
let (tx, rx) = re_smart_channel::smart_channel(
re_smart_channel::SmartMessageSource::Sdk,
re_smart_channel::SmartChannelSource::Sdk,
);
let sink = Box::new(NativeViewerSink(tx));
let app_env = re_viewer::AppEnvironment::from_store_source(&store_info.store_source);

let rec =
RecordingStream::new(store_info, batcher_config, sink).expect("Failed to spawn thread");

// NOTE: Forget the handle on purpose, leave that thread be.
std::thread::Builder::new()
.name("spawned".into())
.spawn(move || run(rec))
.expect("Failed to spawn thread");

// NOTE: Some platforms still mandate that the UI must run on the main thread, so make sure
// to spawn the viewer in place and migrate the user callback to a new thread.
re_viewer::run_native_app(Box::new(move |cc, re_ui| {
let startup_options = re_viewer::StartupOptions::default();
let mut app = re_viewer::App::new(
re_build_info::build_info!(),
&app_env,
startup_options,
re_ui,
cc.storage,
);
app.add_receiver(rx);
Box::new(app)
}))
}

/// Starts a Rerun viewer to visualize the contents of a given array of messages.
/// The method will return when the viewer is closed.
Expand All @@ -78,21 +24,3 @@ pub fn show(msgs: Vec<LogMsg>) -> re_viewer::external::eframe::Result<()> {
msgs,
)
}

// ----------------------------------------------------------------------------

/// Stream log messages to a native viewer on the main thread.
#[cfg(feature = "native_viewer")]
struct NativeViewerSink(pub re_smart_channel::Sender<LogMsg>);

#[cfg(feature = "native_viewer")]
impl re_sdk::sink::LogSink for NativeViewerSink {
fn send(&self, msg: LogMsg) {
if let Err(err) = self.0.send(msg) {
re_log::error_once!("Failed to send log message to viewer: {err}");
}
}

#[inline]
fn flush_blocking(&self) {}
}
5 changes: 1 addition & 4 deletions examples/rust/clock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ license = "MIT OR Apache-2.0"
publish = false

[dependencies]
rerun = { path = "../../../crates/rerun", features = [
"native_viewer",
"web_viewer",
] }
rerun = { path = "../../../crates/rerun", features = ["web_viewer"] }

anyhow = "1.0"
clap = { version = "4.0", features = ["derive"] }
8 changes: 2 additions & 6 deletions examples/rust/clock/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,8 @@ fn main() -> anyhow::Result<()> {
use clap::Parser as _;
let args = Args::parse();

let default_enabled = true;
args.rerun
.clone()
.run("rerun_example_clock", default_enabled, move |rec| {
run(&rec, &args).unwrap();
})
let (rec, _serve_guard) = args.rerun.init("rerun_example_clock")?;
run(&rec, &args)
}

fn run(rec: &rerun::RecordingStream, args: &Args) -> anyhow::Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion examples/rust/dna/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const NUM_POINTS: usize = 100;

fn main() -> Result<(), Box<dyn std::error::Error>> {
let rec = rerun::RecordingStreamBuilder::new("rerun_example_dna_abacus")
.connect(rerun::default_server_addr(), rerun::default_flush_timeout())?;
.spawn(rerun::default_flush_timeout())?;
Comment on lines 15 to +16
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also need to fix this up in rust.md (the tutorial)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are so, so many places where we state the same things.. :(


let (points1, colors1) = color_spiral(NUM_POINTS, 2.0, 0.02, 0.0, 0.1);
let (points2, colors2) = color_spiral(NUM_POINTS, 2.0, 0.02, TAU * 0.5, 0.1);
Expand Down
2 changes: 1 addition & 1 deletion examples/rust/minimal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ license = "MIT OR Apache-2.0"
publish = false

[dependencies]
rerun = { path = "../../../crates/rerun", features = ["native_viewer"] }
rerun = { path = "../../../crates/rerun" }
4 changes: 2 additions & 2 deletions examples/rust/minimal/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
use rerun::{demo_util::grid, external::glam};

fn main() -> Result<(), Box<dyn std::error::Error>> {
let (rec, storage) = rerun::RecordingStreamBuilder::new("rerun_example_minimal_rs").memory()?;
let rec = rerun::RecordingStreamBuilder::new("rerun_example_minimal")
.spawn(rerun::default_flush_timeout())?;

let points = grid(glam::Vec3::splat(-10.0), glam::Vec3::splat(10.0), 10);
let colors = grid(glam::Vec3::ZERO, glam::Vec3::splat(255.0), 10)
Expand All @@ -16,6 +17,5 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_radii([0.5]),
)?;

rerun::native_viewer::show(storage.take())?;
Ok(())
}
5 changes: 1 addition & 4 deletions examples/rust/minimal_options/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ license = "MIT OR Apache-2.0"
publish = false

[dependencies]
rerun = { path = "../../../crates/rerun", features = [
"native_viewer",
"web_viewer",
] }
rerun = { path = "../../../crates/rerun", features = ["web_viewer"] }

anyhow = "1.0"
clap = { version = "4.0", features = ["derive"] }
Expand Down
10 changes: 2 additions & 8 deletions examples/rust/minimal_options/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,8 @@ fn main() -> anyhow::Result<()> {
use clap::Parser as _;
let args = Args::parse();

let default_enabled = true;
args.rerun.clone().run(
"rerun_example_minimal_options",
default_enabled,
move |rec| {
run(&rec, &args).unwrap();
},
)
let (rec, _serve_guard) = args.rerun.init("rerun_example_minimal_options")?;
run(&rec, &args)
}

fn run(rec: &rerun::RecordingStream, args: &Args) -> anyhow::Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion examples/rust/minimal_serve/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let _guard = rt.enter();

let open_browser = true;
let rec = rerun::RecordingStreamBuilder::new("rerun_example_minimal_serve_rs").serve(
let rec = rerun::RecordingStreamBuilder::new("rerun_example_minimal_serve").serve(
"0.0.0.0",
Default::default(),
Default::default(),
Expand Down
Loading
Loading