Skip to content

Commit

Permalink
Web: Support multiple .rrd URLs (#4740)
Browse files Browse the repository at this point in the history
### What

Fixes #4630

New API can be seen here:
https://github.com/rerun-io/web-viewer-react-example/blob/main/src/App.tsx

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested the web demo (if applicable):
* Using newly built examples:
[app.rerun.io](https://app.rerun.io/pr/4740/index.html)
* Using examples from latest `main` build:
[app.rerun.io](https://app.rerun.io/pr/4740/index.html?manifest_url=https://app.rerun.io/version/main/examples_manifest.json)
* Using full set of examples from `nightly` build:
[app.rerun.io](https://app.rerun.io/pr/4740/index.html?manifest_url=https://app.rerun.io/version/nightly/examples_manifest.json)
* [x] The PR title and labels are set such as to maximize their
usefulness for the next release's CHANGELOG

- [PR Build Summary](https://build.rerun.io/pr/4740)
- [Docs
preview](https://rerun.io/preview/165842ec644787f1b959e93fd14071c3ac7b6986/docs)
<!--DOCS-PREVIEW-->
- [Examples
preview](https://rerun.io/preview/165842ec644787f1b959e93fd14071c3ac7b6986/examples)
<!--EXAMPLES-PREVIEW-->
- [Recent benchmark results](https://build.rerun.io/graphs/crates.html)
- [Wasm size tracking](https://build.rerun.io/graphs/sizes.html)
  • Loading branch information
jprochazk authored Jan 8, 2024
1 parent 6131375 commit d204936
Show file tree
Hide file tree
Showing 9 changed files with 308 additions and 132 deletions.
8 changes: 8 additions & 0 deletions crates/re_log/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ pub fn setup_native_logging() {

#[cfg(target_arch = "wasm32")]
pub fn setup_web_logging() {
use std::sync::atomic::{AtomicBool, Ordering};

static LOG_INIT: AtomicBool = AtomicBool::new(false);
if LOG_INIT.load(Ordering::SeqCst) {
return;
}
LOG_INIT.store(true, Ordering::SeqCst);

crate::multi_logger::init().expect("Failed to set logger");
log::set_max_level(log::LevelFilter::Debug);
crate::add_boxed_logger(Box::new(crate::web_logger::WebLogger::new(
Expand Down
51 changes: 30 additions & 21 deletions crates/re_log_encoding/src/stream_rrd_from_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,24 @@ pub fn stream_rrd_from_http_to_channel(
on_msg();
}
match msg {
HttpMessage::LogMsg(msg) => tx.send(msg).warn_on_err_once("failed to send message"),
HttpMessage::LogMsg(msg) => {
if tx.send(msg).is_ok() {
ControlFlow::Continue(())
} else {
re_log::info!("Failed to send log message to viewer - closing");
ControlFlow::Break(())
}
}
HttpMessage::Success => {
tx.quit(None).warn_on_err_once("failed to send quit marker")
tx.quit(None).warn_on_err_once("failed to send quit marker");
ControlFlow::Break(())
}
HttpMessage::Failure(err) => {
tx.quit(Some(err))
.warn_on_err_once("failed to send quit marker");
ControlFlow::Break(())
}
HttpMessage::Failure(err) => tx
.quit(Some(err))
.warn_on_err_once("failed to send quit marker"),
};
}
}),
);
rx
Expand All @@ -46,7 +56,7 @@ pub enum HttpMessage {
Failure(Box<dyn std::error::Error + Send + Sync>),
}

pub type HttpMessageCallback = dyn Fn(HttpMessage) + Send + Sync;
pub type HttpMessageCallback = dyn Fn(HttpMessage) -> ControlFlow<()> + Send + Sync;

pub fn stream_rrd_from_http(url: String, on_msg: Arc<HttpMessageCallback>) {
re_log::debug!("Downloading .rrd file from {url:?}…");
Expand All @@ -69,41 +79,40 @@ pub fn stream_rrd_from_http(url: String, on_msg: Arc<HttpMessageCallback>) {
on_msg(HttpMessage::Failure(
format!("Failed to fetch .rrd file from {url}: {status} {status_text}")
.into(),
));
ControlFlow::Break(())
))
}
}
ehttp::streaming::Part::Chunk(chunk) => {
if chunk.is_empty() {
re_log::debug!("Finished decoding .rrd file from {url:?}…");
on_msg(HttpMessage::Success);
return ControlFlow::Break(());
return on_msg(HttpMessage::Success);
}

re_tracing::profile_scope!("decoding_rrd_stream");
decoder.borrow_mut().push_chunk(chunk);
loop {
match decoder.borrow_mut().try_read() {
Ok(message) => match message {
Some(message) => on_msg(HttpMessage::LogMsg(message)),
Some(message) => {
// only return if the callback asks us to
if on_msg(HttpMessage::LogMsg(message)).is_break() {
return ControlFlow::Break(());
}
}
None => return ControlFlow::Continue(()),
},
Err(err) => {
on_msg(HttpMessage::Failure(
return on_msg(HttpMessage::Failure(
format!("Failed to fetch .rrd file from {url}: {err}").into(),
));
return ControlFlow::Break(());
))
}
}
}
}
},
Err(err) => {
on_msg(HttpMessage::Failure(
format!("Failed to fetch .rrd file from {url}: {err}").into(),
));
ControlFlow::Break(())
}
Err(err) => on_msg(HttpMessage::Failure(
format!("Failed to fetch .rrd file from {url}: {err}").into(),
)),
}
});
}
Expand Down
13 changes: 13 additions & 0 deletions crates/re_smart_channel/src/receive_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,19 @@ impl<T: Send> ReceiveSet<T> {
self.receivers.lock().retain(|r| r.source() != source);
}

/// Disconnect from any channel with a source pointing at this `uri`.
#[cfg(target_arch = "wasm32")]
pub fn remove_by_uri(&self, uri: &str) {
self.receivers.lock().retain(|r| match r.source() {
// retain only sources which:
// - aren't network sources
// - don't point at the given `uri`
SmartChannelSource::RrdHttpStream { url } => url != uri,
SmartChannelSource::WsClient { ws_server_url } => ws_server_url != uri,
_ => true,
});
}

/// List of connected receiver sources.
///
/// This gets culled after calling one of the `recv` methods.
Expand Down
5 changes: 5 additions & 0 deletions crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,11 @@ impl eframe::App for App {
frame_start.elapsed().as_secs_f32(),
);
}

#[cfg(target_arch = "wasm32")]
fn as_any_mut(&mut self) -> Option<&mut dyn std::any::Any> {
Some(&mut *self)
}
}

/// Add built-in space views to the registry.
Expand Down
22 changes: 22 additions & 0 deletions crates/re_viewer/src/store_hub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,28 @@ impl StoreHub {
self.store_bundle.contains_recording(id)
}

/// Remove any recordings with a network source pointing at this `uri`.
#[cfg(target_arch = "wasm32")]
pub fn remove_recording_by_uri(&mut self, uri: &str) {
self.store_bundle.entity_dbs.retain(|_, db| {
let Some(data_source) = &db.data_source else {
// no data source, keep
return true;
};

// retain only sources which:
// - aren't network sources
// - don't point at the given `uri`
match data_source {
re_smart_channel::SmartChannelSource::RrdHttpStream { url } => url != uri,
re_smart_channel::SmartChannelSource::WsClient { ws_server_url } => {
ws_server_url != uri
}
_ => true,
}
});
}

/// Persist any in-use blueprints to durable storage.
// TODO(#2579): implement persistence for web
#[allow(clippy::unnecessary_wraps)]
Expand Down
119 changes: 76 additions & 43 deletions crates/re_viewer/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use eframe::wasm_bindgen::{self, prelude::*};

use std::ops::ControlFlow;
use std::sync::Arc;

use re_log::ResultExt as _;
Expand Down Expand Up @@ -80,6 +81,26 @@ impl WebHandle {
pub fn panic_callstack(&self) -> Option<String> {
self.runner.panic_summary().map(|s| s.callstack())
}

#[wasm_bindgen]
pub fn add_receiver(&self, url: &str) {
let Some(mut app) = self.runner.app_mut::<crate::App>() else {
return;
};
let rx = url_to_receiver(url, app.re_ui.egui_ctx.clone());
app.add_receiver(rx);
}

#[wasm_bindgen]
pub fn remove_receiver(&self, url: &str) {
let Some(mut app) = self.runner.app_mut::<crate::App>() else {
return;
};
app.msg_receive_set().remove_by_uri(url);
if let Some(store_hub) = app.store_hub.as_mut() {
store_hub.remove_recording_by_uri(url);
}
}
}

fn create_app(
Expand All @@ -102,11 +123,6 @@ fn create_app(
let re_ui = crate::customize_eframe(cc);

let egui_ctx = cc.egui_ctx.clone();
let wake_up_ui_on_msg = Box::new(move || {
// Spend a few more milliseconds decoding incoming messages,
// then trigger a repaint (https://github.com/rerun-io/rerun/issues/963):
egui_ctx.request_repaint_after(std::time::Duration::from_millis(10));
});

let mut app = crate::App::new(build_info, &app_env, startup_options, re_ui, cc.storage);

Expand All @@ -126,50 +142,67 @@ fn create_app(
None => query_map.get("url").map(String::as_str),
};
if let Some(url) = url {
let rx = match categorize_uri(url) {
EndpointCategory::HttpRrd(url) => {
re_log_encoding::stream_rrd_from_http::stream_rrd_from_http_to_channel(
url,
Some(wake_up_ui_on_msg),
)
}
EndpointCategory::WebEventListener => {
// Process an rrd when it's posted via `window.postMessage`
let (tx, rx) = re_smart_channel::smart_channel(
re_smart_channel::SmartMessageSource::RrdWebEventCallback,
re_smart_channel::SmartChannelSource::RrdWebEventListener,
);
re_log_encoding::stream_rrd_from_http::stream_rrd_from_event_listener(Arc::new({
move |msg| {
wake_up_ui_on_msg();
use re_log_encoding::stream_rrd_from_http::HttpMessage;
match msg {
HttpMessage::LogMsg(msg) => {
tx.send(msg).warn_on_err_once("failed to send message")
}
HttpMessage::Success => {
tx.quit(None).warn_on_err_once("failed to send quit marker")
}
HttpMessage::Failure(err) => tx
.quit(Some(err))
.warn_on_err_once("failed to send quit marker"),
};
}
}));
rx
}
EndpointCategory::WebSocket(url) => {
re_data_source::connect_to_ws_url(&url, Some(wake_up_ui_on_msg)).unwrap_or_else(
|err| panic!("Failed to connect to WebSocket server at {url}: {err}"),
)
}
};
let rx = url_to_receiver(url, egui_ctx.clone());
app.add_receiver(rx);
}

app
}

fn url_to_receiver(
url: &str,
egui_ctx: egui::Context,
) -> re_smart_channel::Receiver<re_log_types::LogMsg> {
let ui_waker = Box::new(move || {
// Spend a few more milliseconds decoding incoming messages,
// then trigger a repaint (https://github.com/rerun-io/rerun/issues/963):
egui_ctx.request_repaint_after(std::time::Duration::from_millis(10));
});
match categorize_uri(url) {
EndpointCategory::HttpRrd(url) => {
re_log_encoding::stream_rrd_from_http::stream_rrd_from_http_to_channel(
url,
Some(ui_waker),
)
}
EndpointCategory::WebEventListener => {
// Process an rrd when it's posted via `window.postMessage`
let (tx, rx) = re_smart_channel::smart_channel(
re_smart_channel::SmartMessageSource::RrdWebEventCallback,
re_smart_channel::SmartChannelSource::RrdWebEventListener,
);
re_log_encoding::stream_rrd_from_http::stream_rrd_from_event_listener(Arc::new({
move |msg| {
ui_waker();
use re_log_encoding::stream_rrd_from_http::HttpMessage;
match msg {
HttpMessage::LogMsg(msg) => {
if tx.send(msg).is_ok() {
ControlFlow::Continue(())
} else {
re_log::info!("Failed to send log message to viewer - closing");
ControlFlow::Break(())
}
}
HttpMessage::Success => {
tx.quit(None).warn_on_err_once("failed to send quit marker");
ControlFlow::Break(())
}
HttpMessage::Failure(err) => {
tx.quit(Some(err))
.warn_on_err_once("failed to send quit marker");
ControlFlow::Break(())
}
}
}
}));
rx
}
EndpointCategory::WebSocket(url) => re_data_source::connect_to_ws_url(&url, Some(ui_waker))
.unwrap_or_else(|err| panic!("Failed to connect to WebSocket server at {url}: {err}")),
}
}

/// Used to set the "email" property in the analytics config,
/// in the same way as `rerun analytics email [email protected]`.
///
Expand Down
Loading

0 comments on commit d204936

Please sign in to comment.