Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new javascript API for submitting an RRD that is stored directly as bytes #6189

Merged
merged 9 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/re_log_encoding/src/stream_rrd_from_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ mod web_event_listener {
pub use web_event_listener::stream_rrd_from_event_listener;

#[cfg(target_arch = "wasm32")]
mod web_decode {
pub mod web_decode {
use super::{HttpMessage, HttpMessageCallback};
use std::sync::Arc;

Expand Down
13 changes: 12 additions & 1 deletion crates/re_smart_channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ pub enum SmartChannelSource {
/// Used for the inline web viewer in a notebook.
RrdWebEventListener,

/// The channel was created in the context of a javascript client submitting an RRD directly as bytes.
JsBytes,

/// The channel was created in the context of loading data using a Rerun SDK sharing the same
/// process.
Sdk,
Expand Down Expand Up @@ -62,6 +65,7 @@ impl std::fmt::Display for SmartChannelSource {
Self::File(path) => path.display().fmt(f),
Self::RrdHttpStream { url } => url.fmt(f),
Self::RrdWebEventListener => "Web event listener".fmt(f),
Self::JsBytes => "Javascript".fmt(f),
Self::Sdk => "SDK".fmt(f),
Self::WsClient { ws_server_url } => ws_server_url.fmt(f),
Self::TcpServer { port } => write!(f, "TCP server, port {port}"),
Expand All @@ -74,7 +78,10 @@ impl SmartChannelSource {
pub fn is_network(&self) -> bool {
match self {
Self::File(_) | Self::Sdk | Self::RrdWebEventListener | Self::Stdin => false,
Self::RrdHttpStream { .. } | Self::WsClient { .. } | Self::TcpServer { .. } => true,
Self::RrdHttpStream { .. }
| Self::WsClient { .. }
| Self::JsBytes
| Self::TcpServer { .. } => true,
}
}
}
Expand Down Expand Up @@ -104,6 +111,9 @@ pub enum SmartMessageSource {
/// Only applicable to web browser iframes.
RrdWebEventCallback,

/// The sender is a javascript client submitting an RRD directly as bytes.
JsBytes,

/// The sender is a Rerun SDK running from another thread in the same process.
Sdk,

Expand Down Expand Up @@ -133,6 +143,7 @@ impl std::fmt::Display for SmartMessageSource {
SmartMessageSource::File(path) => format!("file://{}", path.to_string_lossy()),
SmartMessageSource::RrdHttpStream { url } => format!("http://{url}"),
SmartMessageSource::RrdWebEventCallback => "web_callback".into(),
SmartMessageSource::JsBytes => "javascript".into(),
SmartMessageSource::Sdk => "sdk".into(),
SmartMessageSource::WsClient { ws_server_url } => ws_server_url.clone(),
SmartMessageSource::TcpClient { addr } => format!(
Expand Down
4 changes: 3 additions & 1 deletion crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ impl App {
SmartChannelSource::File(_) | SmartChannelSource::RrdHttpStream { .. } => false,

SmartChannelSource::WsClient { .. }
| SmartChannelSource::JsBytes
| SmartChannelSource::RrdWebEventListener
| SmartChannelSource::Sdk
| SmartChannelSource::TcpServer { .. }
Expand Down Expand Up @@ -1207,7 +1208,8 @@ impl App {
| SmartChannelSource::Stdin
| SmartChannelSource::RrdWebEventListener
| SmartChannelSource::Sdk
| SmartChannelSource::WsClient { .. } => {
| SmartChannelSource::WsClient { .. }
| SmartChannelSource::JsBytes => {
return true; // We expect data soon, so fade-in
}

Expand Down
3 changes: 2 additions & 1 deletion crates/re_viewer/src/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,8 @@ fn recording_config_entry<'cfgs>(
re_smart_channel::SmartChannelSource::Sdk
| re_smart_channel::SmartChannelSource::WsClient { .. }
| re_smart_channel::SmartChannelSource::TcpServer { .. }
| re_smart_channel::SmartChannelSource::Stdin => PlayState::Following,
| re_smart_channel::SmartChannelSource::Stdin
| re_smart_channel::SmartChannelSource::JsBytes => PlayState::Following,
}
} else {
PlayState::Following // No known source 🤷‍♂️
Expand Down
1 change: 1 addition & 0 deletions crates/re_viewer/src/ui/recordings_panel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ fn loading_receivers_ui(ctx: &ViewerContext<'_>, rx: &ReceiveSet<LogMsg>, ui: &m
SmartChannelSource::RrdHttpStream { url } => format!("Loading {url}…"),

SmartChannelSource::RrdWebEventListener
| SmartChannelSource::JsBytes
| SmartChannelSource::Sdk
| SmartChannelSource::WsClient { .. }
| SmartChannelSource::TcpServer { .. }
Expand Down
5 changes: 4 additions & 1 deletion crates/re_viewer/src/ui/top_panel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ fn connection_status_ui(ui: &mut egui::Ui, rx: &ReceiveSet<re_log_types::LogMsg>
re_smart_channel::SmartChannelSource::RrdWebEventListener
| re_smart_channel::SmartChannelSource::Sdk
| re_smart_channel::SmartChannelSource::WsClient { .. }
| re_smart_channel::SmartChannelSource::TcpServer { .. } => true,
| re_smart_channel::SmartChannelSource::TcpServer { .. }
| re_smart_channel::SmartChannelSource::JsBytes => true,
}
})
.collect_vec();
Expand Down Expand Up @@ -182,6 +183,7 @@ fn connection_status_ui(ui: &mut egui::Ui, rx: &ReceiveSet<re_log_types::LogMsg>
| SmartChannelSource::Stdin
| SmartChannelSource::RrdHttpStream { .. }
| SmartChannelSource::RrdWebEventListener
| SmartChannelSource::JsBytes
| SmartChannelSource::Sdk
| SmartChannelSource::WsClient { .. } => None,

Expand Down Expand Up @@ -209,6 +211,7 @@ fn connection_status_ui(ui: &mut egui::Ui, rx: &ReceiveSet<re_log_types::LogMsg>
re_smart_channel::SmartChannelSource::RrdWebEventListener => {
"Waiting for logging data…".to_owned()
}
re_smart_channel::SmartChannelSource::JsBytes => "Waiting for logging data…".to_owned(),
re_smart_channel::SmartChannelSource::Sdk => {
"Waiting for logging data from SDK".to_owned()
}
Expand Down
3 changes: 2 additions & 1 deletion crates/re_viewer/src/viewer_analytics/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ pub fn open_recording(
re_smart_channel::SmartChannelSource::File(_) => "file", // .rrd, .png, .glb, …
re_smart_channel::SmartChannelSource::RrdHttpStream { .. } => "http",
re_smart_channel::SmartChannelSource::RrdWebEventListener { .. } => "web_event",
re_smart_channel::SmartChannelSource::Sdk => "sdk", // show()
re_smart_channel::SmartChannelSource::JsBytes { .. } => "javascript", // mediated via rerun-js
re_smart_channel::SmartChannelSource::Sdk => "sdk", // show()
re_smart_channel::SmartChannelSource::WsClient { .. } => "ws_client", // spawn()
re_smart_channel::SmartChannelSource::TcpServer { .. } => "tcp_server", // connect()
re_smart_channel::SmartChannelSource::Stdin => "stdin",
Expand Down
65 changes: 65 additions & 0 deletions crates/re_viewer/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ static GLOBAL: AccountingAllocator<std::alloc::System> =
#[wasm_bindgen]
pub struct WebHandle {
runner: eframe::WebRunner,

/// A dedicated smart channel used by the [`WebHandle::add_rrd_from_bytes`] API.
///
/// This exists because the direct bytes API is expected to submit many small RRD chunks
/// and allocating a new tx pair for each chunk doesn't make sense.
bytes_tx: Option<re_smart_channel::Sender<re_log_types::LogMsg>>,
}

#[wasm_bindgen]
Expand All @@ -28,6 +34,7 @@ impl WebHandle {

Self {
runner: eframe::WebRunner::new(),
bytes_tx: None,
}
}

Expand Down Expand Up @@ -107,6 +114,64 @@ impl WebHandle {
store_hub.remove_recording_by_uri(url);
}
}

/// Add an rrd to the viewer directly from a byte array.
#[wasm_bindgen]
pub fn add_rrd_from_bytes(&mut self, data: &[u8]) {
use std::{ops::ControlFlow, sync::Arc};
let Some(mut app) = self.runner.app_mut::<crate::App>() else {
return;
};

// TODO(jleibs): Should we provide a mechanism of the javascript signalling we're done?
if self.bytes_tx.is_none() {
let (tx, rx) = re_smart_channel::smart_channel(
re_smart_channel::SmartMessageSource::JsBytes,
re_smart_channel::SmartChannelSource::JsBytes,
);
self.bytes_tx = Some(tx);
app.add_receiver(rx);
}

if let Some(tx) = self.bytes_tx.clone() {
let data: Vec<u8> = data.to_vec();

let egui_ctx = app.re_ui.egui_ctx.clone();

let ui_waker = Box::new(move || {
// Spend a few more milliseconds decoding incoming messages,
// then trigger a repaint (https://github.com/rerun-io/rerun/issues/963):
egui_ctx.request_repaint_after(std::time::Duration::from_millis(10));
});

re_log_encoding::stream_rrd_from_http::web_decode::decode_rrd(
data,
Arc::new({
move |msg| {
ui_waker();
use re_log_encoding::stream_rrd_from_http::HttpMessage;
match msg {
HttpMessage::LogMsg(msg) => {
if tx.send(msg).is_ok() {
ControlFlow::Continue(())
} else {
re_log::info_once!("Failed to dispatch log message to viewer.");
ControlFlow::Break(())
}
}
// TODO(jleibs): Unclear what we want to do here. More data is coming.
HttpMessage::Success => ControlFlow::Continue(()),
HttpMessage::Failure(err) => {
tx.quit(Some(err))
.warn_on_err_once("Failed to send quit marker");
ControlFlow::Break(())
}
}
}
}),
);
}
}
}

fn create_app(
Expand Down
12 changes: 12 additions & 0 deletions rerun_js/web-viewer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,16 @@ export class WebViewer {
this.#canvas = null;
this.#handle = null;
}

/**
* Add an rrd to the viewer from a byte array.
*
* @param {Uint8Array} rrd stored in a byte array, received via some other side channel.
*/
add_rrd_from_bytes(rrd) {
jleibs marked this conversation as resolved.
Show resolved Hide resolved
if (!this.#handle) {
throw new Error(`attempted to add bytes to a stopped viewer`);
}
this.#handle.add_rrd_from_bytes(rrd);
}
}
Loading