Skip to content

Commit

Permalink
Add support for loading many images, meshes, or rrds at once with rerun
Browse files Browse the repository at this point in the history
  • Loading branch information
emilk committed May 12, 2023
1 parent 8151328 commit 5c6de58
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 70 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ polars-core = "0.29"
polars-lazy = "0.29"
polars-ops = "0.29"
puffin = "0.14"
rayon = "1.7"
rfd = { version = "0.11.3", default_features = false, features = [
"xdg-portal",
] }
Expand Down
4 changes: 3 additions & 1 deletion crates/rerun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,10 @@ webbrowser = { version = "0.8", optional = true }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
backtrace = "0.3"
clap = { workspace = true, features = ["derive"] }
mimalloc.workspace = true
ctrlc.workspace = true
mimalloc.workspace = true
puffin.workspace = true
rayon.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }

# Native unix dependencies:
Expand Down
202 changes: 133 additions & 69 deletions crates/rerun/src/run.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::{atomic::AtomicBool, Arc};

use itertools::Itertools;
use re_log_types::{LogMsg, PythonVersion};
use re_smart_channel::Receiver;

Expand Down Expand Up @@ -96,11 +97,13 @@ struct Args {
#[clap(long)]
test_receive: bool,

/// Either a path to a `.rrd` file to load, an http url to an `.rrd` file,
/// Either: a path to `.rrd` file(s) to load,
/// some mesh or image files to show,
/// an http url to an `.rrd` file,
/// or a websocket url to a Rerun Server from which to read data
///
/// If none is given, a server will be hosted which the Rerun SDK can connect to.
url_or_path: Option<String>,
url_or_paths: Vec<String>,

/// Print version and quit
#[clap(long)]
Expand Down Expand Up @@ -305,50 +308,83 @@ async fn run_impl(
let (shutdown_rx, shutdown_bool) = setup_ctrl_c_handler();

// Where do we get the data from?
let rx = if let Some(url_or_path) = args.url_or_path.clone() {
match categorize_argument(url_or_path) {
ArgumentCategory::RrdHttpUrl(url) => {
re_log_encoding::stream_rrd_from_http::stream_rrd_from_http_to_channel(url)
}
ArgumentCategory::RrdFilePath(path) => {
re_log::info!("Loading {path:?}…");
load_file_to_channel(&path).with_context(|| format!("{path:?}"))?
let rx = if !args.url_or_paths.is_empty() {
if 1 < args.url_or_paths.len() {
// Load many files:
let paths = args
.url_or_paths
.iter()
.map(std::path::PathBuf::from)
.collect_vec();
let (tx, rx) = re_smart_channel::smart_channel(re_smart_channel::Source::Files {
paths: paths.clone(),
});

let recording_id = re_log_types::RecordingId::random();

// Load the files in parallel, and log errors.
// Failing to log one out of many files is not a big deal.
for path in paths {
let tx = tx.clone();
let recording_id = recording_id.clone();
rayon::spawn(move || {
if let Err(err) = load_file_to_channel_at(recording_id, &path, tx) {
re_log::error!("Failed to load {path:?}: {err}");
}
});
}
ArgumentCategory::WebSocketAddr(rerun_server_ws_url) => {
// We are connecting to a server at a websocket address:

if args.web_viewer {
#[cfg(feature = "web_viewer")]
{
let web_viewer = host_web_viewer(
args.web_viewer_port,
true,
rx
} else {
match categorize_argument(args.url_or_paths[0].clone()) {
ArgumentCategory::RrdHttpUrl(url) => {
re_log_encoding::stream_rrd_from_http::stream_rrd_from_http_to_channel(url)
}
ArgumentCategory::FilePath(path) => {
let (tx, rx) =
re_smart_channel::smart_channel(re_smart_channel::Source::Files {
paths: vec![path.clone()],
});
let recording_id = re_log_types::RecordingId::random();
load_file_to_channel_at(recording_id, &path, tx)
.with_context(|| format!("{path:?}"))?;
rx
}
ArgumentCategory::WebSocketAddr(rerun_server_ws_url) => {
// We are connecting to a server at a websocket address:

if args.web_viewer {
#[cfg(feature = "web_viewer")]
{
let web_viewer = host_web_viewer(
args.web_viewer_port,
true,
rerun_server_ws_url,
shutdown_rx.resubscribe(),
);
// We return here because the running [`WebViewerServer`] is all we need.
// The page we open will be pointed at a websocket url hosted by a *different* server.
return web_viewer.await;
}
#[cfg(not(feature = "web_viewer"))]
{
_ = (rerun_server_ws_url, shutdown_rx);
panic!("Can't host web-viewer - rerun was not compiled with the 'web_viewer' feature");
}
} else {
#[cfg(feature = "native_viewer")]
return native_viewer_connect_to_ws_url(
_build_info,
call_source.app_env(),
startup_options,
profiler,
rerun_server_ws_url,
shutdown_rx.resubscribe(),
);
// We return here because the running [`WebViewerServer`] is all we need.
// The page we open will be pointed at a websocket url hosted by a *different* server.
return web_viewer.await;
}
#[cfg(not(feature = "web_viewer"))]
{
_ = (rerun_server_ws_url, shutdown_rx);
panic!("Can't host web-viewer - rerun was not compiled with the 'web_viewer' feature");
}
} else {
#[cfg(feature = "native_viewer")]
return native_viewer_connect_to_ws_url(
_build_info,
call_source.app_env(),
startup_options,
profiler,
rerun_server_ws_url,
);

#[cfg(not(feature = "native_viewer"))]
{
_ = (call_source, rerun_server_ws_url);
anyhow::bail!("Can't start viewer - rerun was compiled without the 'native_viewer' feature");

#[cfg(not(feature = "native_viewer"))]
{
_ = (call_source, rerun_server_ws_url);
anyhow::bail!("Can't start viewer - rerun was compiled without the 'native_viewer' feature");
}
}
}
}
Expand Down Expand Up @@ -379,7 +415,7 @@ async fn run_impl(
#[cfg(feature = "web_viewer")]
{
#[cfg(feature = "server")]
if args.url_or_path.is_none()
if args.url_or_paths.is_empty()
&& (args.port == args.web_viewer_port.0 || args.port == args.ws_server_port.0)
{
anyhow::bail!(
Expand Down Expand Up @@ -497,7 +533,7 @@ enum ArgumentCategory {
RrdHttpUrl(String),

/// A path to a local file.
RrdFilePath(std::path::PathBuf),
FilePath(std::path::PathBuf),

/// A remote Rerun server.
WebSocketAddr(String),
Expand All @@ -506,12 +542,12 @@ enum ArgumentCategory {
fn categorize_argument(mut uri: String) -> ArgumentCategory {
let path = std::path::Path::new(&uri).to_path_buf();

if uri.starts_with("http") {
if uri.starts_with("file://") || path.exists() {
ArgumentCategory::FilePath(path)
} else if uri.starts_with("http") {
ArgumentCategory::RrdHttpUrl(uri)
} else if uri.starts_with("ws") {
ArgumentCategory::WebSocketAddr(uri)
} else if uri.starts_with("file://") || path.exists() || uri.ends_with(".rrd") {
ArgumentCategory::RrdFilePath(path)
} else {
// If this is sometyhing like `foo.com` we can't know what it is until we connect to it.
// We could/should connect and see what it is, but for now we just take a wild guess instead:
Expand Down Expand Up @@ -547,33 +583,61 @@ fn native_viewer_connect_to_ws_url(
Ok(())
}

fn load_file_to_channel(path: &std::path::Path) -> anyhow::Result<Receiver<LogMsg>> {
fn load_file_to_channel_at(
recording_id: re_log_types::RecordingId,
path: &std::path::Path,
tx: re_smart_channel::Sender<LogMsg>,
) -> Result<(), anyhow::Error> {
puffin::profile_function!(path.to_string_lossy());
re_log::info!("Loading {path:?}…");

let extension = path
.extension()
.unwrap_or_default()
.to_ascii_lowercase()
.to_string_lossy()
.to_string();

if extension == "rrd" {
load_rrd_file_to_channel(path.to_owned(), tx)
} else {
#[cfg(feature = "sdk")]
{
let log_msg = re_sdk::MsgSender::from_file_path(path)?.into_log_msg(recording_id)?;
tx.send(log_msg).ok();
Ok(())
}

#[cfg(not(feature = "sdk"))]
{
_ = recording_id;
anyhow::bail!("Unsupported file extension: '{extension}' for path {path:?}. Try enabvling the 'sdk' feature of 'rerun'.");
}
}
}

fn load_rrd_file_to_channel(
path: std::path::PathBuf,
tx: re_smart_channel::Sender<LogMsg>,
) -> anyhow::Result<()> {
use anyhow::Context as _;
let file = std::fs::File::open(path).context("Failed to open file")?;
let file = std::fs::File::open(&path).context("Failed to open file")?;
let decoder = re_log_encoding::decoder::Decoder::new(file)?;

let (tx, rx) = re_smart_channel::smart_channel(re_smart_channel::Source::File {
path: path.to_owned(),
});

let path = path.to_owned();
std::thread::Builder::new()
.name("rrd_file_reader".into())
.spawn(move || {
for msg in decoder {
match msg {
Ok(msg) => {
tx.send(msg).ok();
}
Err(err) => {
re_log::warn_once!("Failed to decode message in {path:?}: {err}");
}
rayon::spawn(move || {
for msg in decoder {
match msg {
Ok(msg) => {
tx.send(msg).ok();
}
Err(err) => {
re_log::warn_once!("Failed to decode message in {path:?}: {err}");
}
}
})
.expect("Failed to spawn thread");
}
});

Ok(rx)
Ok(())
}

fn stream_to_rrd(
Expand Down

0 comments on commit 5c6de58

Please sign in to comment.