Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Web: Support multiple .rrd URLs #4740

Merged
merged 7 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions crates/re_log/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ pub fn setup_native_logging() {

#[cfg(target_arch = "wasm32")]
pub fn setup_web_logging() {
use std::sync::atomic::{AtomicBool, Ordering};

static LOG_INIT: AtomicBool = AtomicBool::new(false);
if LOG_INIT.load(Ordering::SeqCst) {
return;
}
LOG_INIT.store(true, Ordering::SeqCst);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling new WebViewer twice would cause a panic here, because the logger would be set by the previous call. Not sure if this is the best way to fix this, as a panic during initialization will cause the logger to not work anymore.


crate::multi_logger::init().expect("Failed to set logger");
log::set_max_level(log::LevelFilter::Debug);
crate::add_boxed_logger(Box::new(crate::web_logger::WebLogger::new(
Expand Down
52 changes: 31 additions & 21 deletions crates/re_log_encoding/src/stream_rrd_from_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,24 @@ pub fn stream_rrd_from_http_to_channel(
on_msg();
}
match msg {
HttpMessage::LogMsg(msg) => tx.send(msg).warn_on_err_once("failed to send message"),
HttpMessage::LogMsg(msg) => {
if tx.send(msg).is_ok() {
ControlFlow::Continue(())
} else {
re_log::info!("Failed to send log message to viewer - closing");
ControlFlow::Break(())
}
}
HttpMessage::Success => {
tx.quit(None).warn_on_err_once("failed to send quit marker")
tx.quit(None).warn_on_err_once("failed to send quit marker");
ControlFlow::Break(())
}
HttpMessage::Failure(err) => {
tx.quit(Some(err))
.warn_on_err_once("failed to send quit marker");
ControlFlow::Break(())
}
HttpMessage::Failure(err) => tx
.quit(Some(err))
.warn_on_err_once("failed to send quit marker"),
};
}
}),
);
rx
Expand All @@ -46,7 +56,8 @@ pub enum HttpMessage {
Failure(Box<dyn std::error::Error + Send + Sync>),
}

pub type HttpMessageCallback = dyn Fn(HttpMessage) + Send + Sync;
// make this return ControlFlow
jprochazk marked this conversation as resolved.
Show resolved Hide resolved
pub type HttpMessageCallback = dyn Fn(HttpMessage) -> ControlFlow<()> + Send + Sync;

pub fn stream_rrd_from_http(url: String, on_msg: Arc<HttpMessageCallback>) {
re_log::debug!("Downloading .rrd file from {url:?}…");
Expand All @@ -69,41 +80,40 @@ pub fn stream_rrd_from_http(url: String, on_msg: Arc<HttpMessageCallback>) {
on_msg(HttpMessage::Failure(
format!("Failed to fetch .rrd file from {url}: {status} {status_text}")
.into(),
));
ControlFlow::Break(())
))
}
}
ehttp::streaming::Part::Chunk(chunk) => {
if chunk.is_empty() {
re_log::debug!("Finished decoding .rrd file from {url:?}…");
on_msg(HttpMessage::Success);
return ControlFlow::Break(());
return on_msg(HttpMessage::Success);
}

re_tracing::profile_scope!("decoding_rrd_stream");
decoder.borrow_mut().push_chunk(chunk);
loop {
match decoder.borrow_mut().try_read() {
Ok(message) => match message {
Some(message) => on_msg(HttpMessage::LogMsg(message)),
Some(message) => {
// only return if the callback asks us to
if on_msg(HttpMessage::LogMsg(message)).is_break() {
return ControlFlow::Break(());
}
}
None => return ControlFlow::Continue(()),
},
Err(err) => {
on_msg(HttpMessage::Failure(
return on_msg(HttpMessage::Failure(
format!("Failed to fetch .rrd file from {url}: {err}").into(),
));
return ControlFlow::Break(());
))
}
}
}
}
},
Err(err) => {
on_msg(HttpMessage::Failure(
format!("Failed to fetch .rrd file from {url}: {err}").into(),
));
ControlFlow::Break(())
}
Err(err) => on_msg(HttpMessage::Failure(
format!("Failed to fetch .rrd file from {url}: {err}").into(),
)),
}
});
}
Expand Down
13 changes: 13 additions & 0 deletions crates/re_smart_channel/src/receive_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,19 @@ impl<T: Send> ReceiveSet<T> {
self.receivers.lock().retain(|r| r.source() != source);
}

/// Disconnect from any channel with a source pointing at this `uri`.
#[cfg(target_arch = "wasm32")]
pub fn remove_by_uri(&self, uri: &str) {
self.receivers.lock().retain(|r| match r.source() {
// retain only sources which:
// - aren't network sources
// - don't point at the given `uri`
SmartChannelSource::RrdHttpStream { url } => url != uri,
SmartChannelSource::WsClient { ws_server_url } => ws_server_url != uri,
_ => true,
});
}

/// List of connected receiver sources.
///
/// This gets culled after calling one of the `recv` methods.
Expand Down
5 changes: 5 additions & 0 deletions crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,11 @@ impl eframe::App for App {
frame_start.elapsed().as_secs_f32(),
);
}

#[cfg(target_arch = "wasm32")]
fn as_any_mut(&mut self) -> Option<&mut dyn std::any::Any> {
Some(&mut *self)
}
}

/// Add built-in space views to the registry.
Expand Down
22 changes: 22 additions & 0 deletions crates/re_viewer/src/store_hub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,28 @@ impl StoreHub {
self.store_bundle.contains_recording(id)
}

/// Remove any recordings with a network source pointing at this `uri`.
#[cfg(target_arch = "wasm32")]
pub fn remove_recording_by_uri(&mut self, uri: &str) {
self.store_bundle.entity_dbs.retain(|_, db| {
let Some(data_source) = &db.data_source else {
// no data source, keep
return true;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this make sense? I'm not sure how it's possible to have a database without a data source, and if it should be handled differently here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah -- I'd love for us to refactor things to make this not possible. Mostly this shows up in unit-tests with programmatically created stores.

Any data being streamed into the app should have it's store set based on the channel-source though:
See: https://github.com/rerun-io/rerun/blob/main/crates/re_viewer/src/app.rs#L819-L821

};

// retain only sources which:
// - aren't network sources
// - don't point at the given `uri`
match data_source {
re_smart_channel::SmartChannelSource::RrdHttpStream { url } => url != uri,
re_smart_channel::SmartChannelSource::WsClient { ws_server_url } => {
ws_server_url != uri
}
_ => true,
}
});
}

/// Persist any in-use blueprints to durable storage.
// TODO(#2579): implement persistence for web
#[allow(clippy::unnecessary_wraps)]
Expand Down
119 changes: 76 additions & 43 deletions crates/re_viewer/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use eframe::wasm_bindgen::{self, prelude::*};

use std::ops::ControlFlow;
use std::sync::Arc;

use re_log::ResultExt as _;
Expand Down Expand Up @@ -80,6 +81,26 @@ impl WebHandle {
pub fn panic_callstack(&self) -> Option<String> {
self.runner.panic_summary().map(|s| s.callstack())
}

#[wasm_bindgen]
pub fn add_receiver(&self, url: &str) {
let Some(mut app) = self.runner.app_mut::<crate::App>() else {
return;
};
let rx = url_to_receiver(url, app.re_ui.egui_ctx.clone());
app.add_receiver(rx);
}

#[wasm_bindgen]
pub fn remove_receiver(&self, url: &str) {
let Some(mut app) = self.runner.app_mut::<crate::App>() else {
return;
};
app.msg_receive_set().remove_by_uri(url);
if let Some(store_hub) = app.store_hub.as_mut() {
store_hub.remove_recording_by_uri(url);
}
Comment on lines +100 to +102
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any situation where we want to remove the receiver, but keep the recording that has been received so far? I'm imagining a case where a user is connected to a live stream and they want to stop getting updates but still inspect the data.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely. The APIs exposed here were made just to faciliate the React-style magic. I guess we could expose a separate disconnect API that just closes the stream, and preserves the recording.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave it out of here for now, I don't know what that would look like on the React side

}
}

fn create_app(
Expand All @@ -102,11 +123,6 @@ fn create_app(
let re_ui = crate::customize_eframe(cc);

let egui_ctx = cc.egui_ctx.clone();
let wake_up_ui_on_msg = Box::new(move || {
// Spend a few more milliseconds decoding incoming messages,
// then trigger a repaint (https://github.com/rerun-io/rerun/issues/963):
egui_ctx.request_repaint_after(std::time::Duration::from_millis(10));
});

let mut app = crate::App::new(build_info, &app_env, startup_options, re_ui, cc.storage);

Expand All @@ -126,50 +142,67 @@ fn create_app(
None => query_map.get("url").map(String::as_str),
};
if let Some(url) = url {
let rx = match categorize_uri(url) {
EndpointCategory::HttpRrd(url) => {
re_log_encoding::stream_rrd_from_http::stream_rrd_from_http_to_channel(
url,
Some(wake_up_ui_on_msg),
)
}
EndpointCategory::WebEventListener => {
// Process an rrd when it's posted via `window.postMessage`
let (tx, rx) = re_smart_channel::smart_channel(
re_smart_channel::SmartMessageSource::RrdWebEventCallback,
re_smart_channel::SmartChannelSource::RrdWebEventListener,
);
re_log_encoding::stream_rrd_from_http::stream_rrd_from_event_listener(Arc::new({
move |msg| {
wake_up_ui_on_msg();
use re_log_encoding::stream_rrd_from_http::HttpMessage;
match msg {
HttpMessage::LogMsg(msg) => {
tx.send(msg).warn_on_err_once("failed to send message")
}
HttpMessage::Success => {
tx.quit(None).warn_on_err_once("failed to send quit marker")
}
HttpMessage::Failure(err) => tx
.quit(Some(err))
.warn_on_err_once("failed to send quit marker"),
};
}
}));
rx
}
EndpointCategory::WebSocket(url) => {
re_data_source::connect_to_ws_url(&url, Some(wake_up_ui_on_msg)).unwrap_or_else(
|err| panic!("Failed to connect to WebSocket server at {url}: {err}"),
)
}
};
let rx = url_to_receiver(url, egui_ctx.clone());
app.add_receiver(rx);
}

app
}

fn url_to_receiver(
url: &str,
egui_ctx: egui::Context,
) -> re_smart_channel::Receiver<re_log_types::LogMsg> {
let ui_waker = Box::new(move || {
// Spend a few more milliseconds decoding incoming messages,
// then trigger a repaint (https://github.com/rerun-io/rerun/issues/963):
egui_ctx.request_repaint_after(std::time::Duration::from_millis(10));
});
match categorize_uri(url) {
EndpointCategory::HttpRrd(url) => {
re_log_encoding::stream_rrd_from_http::stream_rrd_from_http_to_channel(
url,
Some(ui_waker),
)
}
EndpointCategory::WebEventListener => {
// Process an rrd when it's posted via `window.postMessage`
let (tx, rx) = re_smart_channel::smart_channel(
re_smart_channel::SmartMessageSource::RrdWebEventCallback,
re_smart_channel::SmartChannelSource::RrdWebEventListener,
);
re_log_encoding::stream_rrd_from_http::stream_rrd_from_event_listener(Arc::new({
move |msg| {
ui_waker();
use re_log_encoding::stream_rrd_from_http::HttpMessage;
match msg {
HttpMessage::LogMsg(msg) => {
if tx.send(msg).is_ok() {
ControlFlow::Continue(())
} else {
re_log::info!("Failed to send log message to viewer - closing");
ControlFlow::Break(())
}
}
HttpMessage::Success => {
tx.quit(None).warn_on_err_once("failed to send quit marker");
ControlFlow::Break(())
}
HttpMessage::Failure(err) => {
tx.quit(Some(err))
.warn_on_err_once("failed to send quit marker");
ControlFlow::Break(())
}
}
}
}));
rx
}
EndpointCategory::WebSocket(url) => re_data_source::connect_to_ws_url(&url, Some(ui_waker))
.unwrap_or_else(|err| panic!("Failed to connect to WebSocket server at {url}: {err}")),
}
}

/// Used to set the "email" property in the analytics config,
/// in the same way as `rerun analytics email [email protected]`.
///
Expand Down
Loading
Loading