Skip to content

Commit

Permalink
Make it possible to open http-streamed RRDs in follow mode via JS API. (
Browse files Browse the repository at this point in the history
#6326)

### What
- Resolves: #6243

In the context of the Gradio integration, although we are streaming over
http, from the perspective of system operation it feels more like a
websocket. In many situations we want new data to update immediately
once computation is done, rather than playing over time abstract time
domain.

Eventually we would like to control all of this behavior through
blueprint, but moving TimeControls to blueprint is a lot more work. We
already use the SmartChannelSource to make the determination of follow
mode, so with a little bit of additional plumbing, it's possible for us
to indicate this from the js API.

This is a pretty advanced edge-case and so I haven't exposed it to the
other mechanism through which an http source might be added. I assume
this will eventually be removed again once we have TimeControls
available via blueprint.

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested the web demo (if applicable):
* Using examples from latest `main` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/6326?manifest_url=https://app.rerun.io/version/main/examples_manifest.json)
* Using full set of examples from `nightly` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/6326?manifest_url=https://app.rerun.io/version/nightly/examples_manifest.json)
* [x] The PR title and labels are set such as to maximize their
usefulness for the next release's CHANGELOG
* [x] If applicable, add a new check to the [release
checklist](https://github.com/rerun-io/rerun/blob/main/tests/python/release_checklist)!

- [PR Build Summary](https://build.rerun.io/pr/6326)
- [Recent benchmark results](https://build.rerun.io/graphs/crates.html)
- [Wasm size tracking](https://build.rerun.io/graphs/sizes.html)

To run all checks from `main`, comment on the PR with `@rerun-bot
full-check`.
  • Loading branch information
jleibs authored May 15, 2024
1 parent cb35d38 commit a869562
Show file tree
Hide file tree
Showing 13 changed files with 70 additions and 24 deletions.
24 changes: 17 additions & 7 deletions crates/re_data_source/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ use anyhow::Context as _;
pub enum DataSource {
/// A remote RRD file, served over http.
///
/// If `follow` is `true`, the viewer will open the stream in `Following` mode rather than `Playing` mode.
///
/// Could be either an `.rrd` recording or a `.rbl` blueprint.
RrdHttpUrl(String),
RrdHttpUrl { url: String, follow: bool },

/// A path to a local file.
#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -90,15 +92,21 @@ impl DataSource {
|| uri.starts_with("https://")
|| (uri.starts_with("www.") && (uri.ends_with(".rrd") || uri.ends_with(".rbl")))
{
DataSource::RrdHttpUrl(uri)
DataSource::RrdHttpUrl {
url: uri,
follow: false,
}
} else if uri.starts_with("ws://") || uri.starts_with("wss://") {
DataSource::WebSocketAddr(uri)

// Now we are into heuristics territory:
} else if looks_like_a_file_path(&uri) {
DataSource::FilePath(file_source, path)
} else if uri.ends_with(".rrd") || uri.ends_with(".rbl") {
DataSource::RrdHttpUrl(uri)
DataSource::RrdHttpUrl {
url: uri,
follow: false,
}
} else {
// If this is sometyhing like `foo.com` we can't know what it is until we connect to it.
// We could/should connect and see what it is, but for now we just take a wild guess instead:
Expand All @@ -112,7 +120,7 @@ impl DataSource {

pub fn file_name(&self) -> Option<String> {
match self {
DataSource::RrdHttpUrl(url) => url.split('/').last().map(|r| r.to_owned()),
DataSource::RrdHttpUrl { url, .. } => url.split('/').last().map(|r| r.to_owned()),
#[cfg(not(target_arch = "wasm32"))]
DataSource::FilePath(_, path) => {
path.file_name().map(|s| s.to_string_lossy().to_string())
Expand Down Expand Up @@ -140,8 +148,10 @@ impl DataSource {
) -> anyhow::Result<Receiver<LogMsg>> {
re_tracing::profile_function!();
match self {
DataSource::RrdHttpUrl(url) => Ok(
re_log_encoding::stream_rrd_from_http::stream_rrd_from_http_to_channel(url, on_msg),
DataSource::RrdHttpUrl { url, follow } => Ok(
re_log_encoding::stream_rrd_from_http::stream_rrd_from_http_to_channel(
url, follow, on_msg,
),
),

#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -256,7 +266,7 @@ fn test_data_source_from_uri() {
assert!(
matches!(
DataSource::from_uri(file_source, uri.to_owned()),
DataSource::RrdHttpUrl(_)
DataSource::RrdHttpUrl { .. }
),
"Expected {uri:?} to be categorized as RrdHttpUrl"
);
Expand Down
11 changes: 10 additions & 1 deletion crates/re_log_encoding/src/stream_rrd_from_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,23 @@ use std::sync::Arc;
use re_log::ResultExt as _;
use re_log_types::LogMsg;

/// Stream an rrd file from a HTTP server.
///
/// If `follow_if_http` is `true`, and the url is an HTTP source, the viewer will open the stream
/// in `Following` mode rather than `Playing` mode.
///
/// `on_msg` can be used to wake up the UI thread on Wasm.
pub fn stream_rrd_from_http_to_channel(
url: String,
follow: bool,
on_msg: Option<Box<dyn Fn() + Send + Sync>>,
) -> re_smart_channel::Receiver<LogMsg> {
let (tx, rx) = re_smart_channel::smart_channel(
re_smart_channel::SmartMessageSource::RrdHttpStream { url: url.clone() },
re_smart_channel::SmartChannelSource::RrdHttpStream { url: url.clone() },
re_smart_channel::SmartChannelSource::RrdHttpStream {
url: url.clone(),
follow,
},
);
stream_rrd_from_http(
url.clone(),
Expand Down
6 changes: 4 additions & 2 deletions crates/re_smart_channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ pub enum SmartChannelSource {
File(std::path::PathBuf),

/// The channel was created in the context of loading an `.rrd` file over http.
RrdHttpStream { url: String },
///
/// The `follow` flag indicates whether the viewer should open the stream in `Following` mode rather than `Playing` mode.
RrdHttpStream { url: String, follow: bool },

/// The channel was created in the context of loading an `.rrd` file from a `postMessage`
/// js event.
Expand Down Expand Up @@ -66,7 +68,7 @@ impl std::fmt::Display for SmartChannelSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::File(path) => path.display().fmt(f),
Self::RrdHttpStream { url } => url.fmt(f),
Self::RrdHttpStream { url, follow: _ } => url.fmt(f),
Self::RrdWebEventListener => "Web event listener".fmt(f),
Self::JsChannel { channel_name } => write!(f, "Javascript channel: {channel_name}"),
Self::Sdk => "SDK".fmt(f),
Expand Down
2 changes: 1 addition & 1 deletion crates/re_smart_channel/src/receive_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl<T: Send> ReceiveSet<T> {
// retain only sources which:
// - aren't network sources
// - don't point at the given `uri`
SmartChannelSource::RrdHttpStream { url } => url != uri,
SmartChannelSource::RrdHttpStream { url, .. } => url != uri,
SmartChannelSource::WsClient { ws_server_url } => ws_server_url != uri,
_ => true,
});
Expand Down
2 changes: 1 addition & 1 deletion crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ impl App {
.map(|ctx| ctx.recording)
.and_then(|rec| rec.data_source.as_ref())
{
Some(SmartChannelSource::RrdHttpStream { url }) => format!("{href}?url={url}"),
Some(SmartChannelSource::RrdHttpStream { url, .. }) => format!("{href}?url={url}"),
_ => href,
};

Expand Down
5 changes: 3 additions & 2 deletions crates/re_viewer/src/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,11 +510,12 @@ fn recording_config_entry<'cfgs>(
// Play files from the start by default - it feels nice and alive.
// We assume the `RrdHttpStream` is a done recording.
re_smart_channel::SmartChannelSource::File(_)
| re_smart_channel::SmartChannelSource::RrdHttpStream { .. }
| re_smart_channel::SmartChannelSource::RrdHttpStream { follow: false, .. }
| re_smart_channel::SmartChannelSource::RrdWebEventListener => PlayState::Playing,

// Live data - follow it!
re_smart_channel::SmartChannelSource::Sdk
re_smart_channel::SmartChannelSource::RrdHttpStream { follow: true, .. }
| re_smart_channel::SmartChannelSource::Sdk
| re_smart_channel::SmartChannelSource::WsClient { .. }
| re_smart_channel::SmartChannelSource::TcpServer { .. }
| re_smart_channel::SmartChannelSource::Stdin
Expand Down
2 changes: 1 addition & 1 deletion crates/re_viewer/src/ui/recordings_panel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ fn loading_receivers_ui(ctx: &ViewerContext<'_>, rx: &ReceiveSet<LogMsg>, ui: &m
let string = match source.as_ref() {
// We only show things we know are very-soon-to-be recordings:
SmartChannelSource::File(path) => format!("Loading {}…", path.display()),
SmartChannelSource::RrdHttpStream { url } => format!("Loading {url}…"),
SmartChannelSource::RrdHttpStream { url, .. } => format!("Loading {url}…"),

SmartChannelSource::RrdWebEventListener
| SmartChannelSource::JsChannel { .. }
Expand Down
2 changes: 1 addition & 1 deletion crates/re_viewer/src/ui/top_panel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ fn connection_status_ui(ui: &mut egui::Ui, rx: &ReceiveSet<re_log_types::LogMsg>
format!("Loading {}…", path.display())
}
re_smart_channel::SmartChannelSource::Stdin => "Loading stdin…".to_owned(),
re_smart_channel::SmartChannelSource::RrdHttpStream { url } => {
re_smart_channel::SmartChannelSource::RrdHttpStream { url, .. } => {
format!("Loading {url}…")
}
re_smart_channel::SmartChannelSource::RrdWebEventListener
Expand Down
6 changes: 5 additions & 1 deletion crates/re_viewer/src/ui/welcome_screen/example_section.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,12 +459,16 @@ fn open_example_url(
}
}

let data_source = re_data_source::DataSource::RrdHttpUrl(rrd_url.to_owned());
let data_source = re_data_source::DataSource::RrdHttpUrl {
url: rrd_url.to_owned(),
follow: false,
};

// If the user re-download an already open recording, clear it out first
command_sender.send_system(SystemCommand::ClearSourceAndItsStores(
re_smart_channel::SmartChannelSource::RrdHttpStream {
url: rrd_url.to_owned(),
follow: false,
},
));

Expand Down
18 changes: 15 additions & 3 deletions crates/re_viewer/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,21 @@ impl WebHandle {
self.runner.panic_summary().map(|s| s.callstack())
}

/// Add a new receiver streaming data from the given url.
///
/// If `follow_if_http` is `true`, and the url is an HTTP source, the viewer will open the stream
/// in `Following` mode rather than `Playing` mode.
///
/// Websocket streams are always opened in `Following` mode.
///
/// It is an error to open a channel twice with the same id.
#[wasm_bindgen]
pub fn add_receiver(&self, url: &str) {
pub fn add_receiver(&self, url: &str, follow_if_http: Option<bool>) {
let Some(mut app) = self.runner.app_mut::<crate::App>() else {
return;
};
let rx = url_to_receiver(app.re_ui.egui_ctx.clone(), url);
let follow_if_http = follow_if_http.unwrap_or(false);
let rx = url_to_receiver(app.re_ui.egui_ctx.clone(), follow_if_http, url);
if let Some(rx) = rx.ok_or_log_error() {
app.add_receiver(rx);
}
Expand Down Expand Up @@ -252,7 +261,10 @@ fn create_app(
}

if let Some(url) = url {
if let Some(receiver) = url_to_receiver(cc.egui_ctx.clone(), url).ok_or_log_error() {
let follow_if_http = false;
if let Some(receiver) =
url_to_receiver(cc.egui_ctx.clone(), follow_if_http, url).ok_or_log_error()
{
app.add_receiver(receiver);
}
} else {
Expand Down
7 changes: 6 additions & 1 deletion crates/re_viewer/src/web_tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,11 @@ pub fn translate_query_into_commands(egui_ctx: &egui::Context, command_sender: &
.flatten()
.collect();
if !urls.is_empty() {
let follow_if_http = false;
for url in urls {
if let Some(receiver) = url_to_receiver(egui_ctx.clone(), url).ok_or_log_error() {
if let Some(receiver) =
url_to_receiver(egui_ctx.clone(), follow_if_http, url).ok_or_log_error()
{
// We may be here because the user clicked Back/Forward in the browser while trying
// out examples. If we re-download the same file we should clear out the old data first.
command_sender.send_system(SystemCommand::ClearSourceAndItsStores(
Expand Down Expand Up @@ -197,6 +200,7 @@ impl EndpointCategory {
/// Start receiving from the given url.
pub fn url_to_receiver(
egui_ctx: egui::Context,
follow_if_http: bool,
url: &str,
) -> anyhow::Result<re_smart_channel::Receiver<re_log_types::LogMsg>> {
let ui_waker = Box::new(move || {
Expand All @@ -208,6 +212,7 @@ pub fn url_to_receiver(
EndpointCategory::HttpRrd(url) => Ok(
re_log_encoding::stream_rrd_from_http::stream_rrd_from_http_to_channel(
url,
follow_if_http,
Some(ui_waker),
),
),
Expand Down
2 changes: 1 addition & 1 deletion crates/re_viewer_context/src/store_hub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ impl StoreHub {
// - aren't network sources
// - don't point at the given `uri`
match data_source {
re_smart_channel::SmartChannelSource::RrdHttpStream { url } => url != uri,
re_smart_channel::SmartChannelSource::RrdHttpStream { url, .. } => url != uri,
re_smart_channel::SmartChannelSource::WsClient { ws_server_url } => {
ws_server_url != uri
}
Expand Down
7 changes: 5 additions & 2 deletions rerun_js/web-viewer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,17 @@ export class WebViewer {
* @see {WebViewer.start}
*
* @param {string | string[]} rrd URLs to `.rrd` files or WebSocket connections to our SDK.
* @param {{ follow_if_http?: boolean }} options
* - follow_if_http: Whether Rerun should open the resource in "Following" mode when streaming
* from an HTTP url. Defaults to `false`. Ignored for non-HTTP URLs.
*/
open(rrd) {
open(rrd, options = {}) {
if (!this.#handle) {
throw new Error(`attempted to open \`${rrd}\` in a stopped viewer`);
}
const urls = Array.isArray(rrd) ? rrd : [rrd];
for (const url of urls) {
this.#handle.add_receiver(url);
this.#handle.add_receiver(url, options.follow_if_http);
if (this.#handle.has_panicked()) {
throw new Error(`Web viewer crashed: ${this.#handle.panic_message()}`);
}
Expand Down

0 comments on commit a869562

Please sign in to comment.