Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new javascript API for submitting an RRD that is stored directly as bytes #6189

Merged
merged 9 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/re_log_encoding/src/stream_rrd_from_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ mod web_event_listener {
pub use web_event_listener::stream_rrd_from_event_listener;

#[cfg(target_arch = "wasm32")]
mod web_decode {
pub mod web_decode {
use super::{HttpMessage, HttpMessageCallback};
use std::sync::Arc;

Expand Down
16 changes: 15 additions & 1 deletion crates/re_smart_channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ pub enum SmartChannelSource {
/// Used for the inline web viewer in a notebook.
RrdWebEventListener,

/// The channel was created in the context of a javascript client submitting an RRD directly as bytes.
JsChannel {
/// The name of the channel reported by the javascript client.
channel_name: String,
},

/// The channel was created in the context of loading data using a Rerun SDK sharing the same
/// process.
Sdk,
Expand Down Expand Up @@ -62,6 +68,7 @@ impl std::fmt::Display for SmartChannelSource {
Self::File(path) => path.display().fmt(f),
Self::RrdHttpStream { url } => url.fmt(f),
Self::RrdWebEventListener => "Web event listener".fmt(f),
Self::JsChannel { channel_name } => write!(f, "Javascript channel: {channel_name}"),
Self::Sdk => "SDK".fmt(f),
Self::WsClient { ws_server_url } => ws_server_url.fmt(f),
Self::TcpServer { port } => write!(f, "TCP server, port {port}"),
Expand All @@ -74,7 +81,10 @@ impl SmartChannelSource {
pub fn is_network(&self) -> bool {
match self {
Self::File(_) | Self::Sdk | Self::RrdWebEventListener | Self::Stdin => false,
Self::RrdHttpStream { .. } | Self::WsClient { .. } | Self::TcpServer { .. } => true,
Self::RrdHttpStream { .. }
| Self::WsClient { .. }
| Self::JsChannel { .. }
| Self::TcpServer { .. } => true,
}
}
}
Expand Down Expand Up @@ -104,6 +114,9 @@ pub enum SmartMessageSource {
/// Only applicable to web browser iframes.
RrdWebEventCallback,

/// The sender is a javascript client submitting an RRD directly as bytes.
JsChannelPush,

/// The sender is a Rerun SDK running from another thread in the same process.
Sdk,

Expand Down Expand Up @@ -133,6 +146,7 @@ impl std::fmt::Display for SmartMessageSource {
SmartMessageSource::File(path) => format!("file://{}", path.to_string_lossy()),
SmartMessageSource::RrdHttpStream { url } => format!("http://{url}"),
SmartMessageSource::RrdWebEventCallback => "web_callback".into(),
SmartMessageSource::JsChannelPush => "javascript".into(),
SmartMessageSource::Sdk => "sdk".into(),
SmartMessageSource::WsClient { ws_server_url } => ws_server_url.clone(),
SmartMessageSource::TcpClient { addr } => format!(
Expand Down
4 changes: 3 additions & 1 deletion crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ impl App {
SmartChannelSource::File(_) | SmartChannelSource::RrdHttpStream { .. } => false,

SmartChannelSource::WsClient { .. }
| SmartChannelSource::JsChannel { .. }
| SmartChannelSource::RrdWebEventListener
| SmartChannelSource::Sdk
| SmartChannelSource::TcpServer { .. }
Expand Down Expand Up @@ -1207,7 +1208,8 @@ impl App {
| SmartChannelSource::Stdin
| SmartChannelSource::RrdWebEventListener
| SmartChannelSource::Sdk
| SmartChannelSource::WsClient { .. } => {
| SmartChannelSource::WsClient { .. }
| SmartChannelSource::JsChannel { .. } => {
return true; // We expect data soon, so fade-in
}

Expand Down
3 changes: 2 additions & 1 deletion crates/re_viewer/src/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,8 @@ fn recording_config_entry<'cfgs>(
re_smart_channel::SmartChannelSource::Sdk
| re_smart_channel::SmartChannelSource::WsClient { .. }
| re_smart_channel::SmartChannelSource::TcpServer { .. }
| re_smart_channel::SmartChannelSource::Stdin => PlayState::Following,
| re_smart_channel::SmartChannelSource::Stdin
| re_smart_channel::SmartChannelSource::JsChannel { .. } => PlayState::Following,
}
} else {
PlayState::Following // No known source 🤷‍♂️
Expand Down
1 change: 1 addition & 0 deletions crates/re_viewer/src/ui/recordings_panel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ fn loading_receivers_ui(ctx: &ViewerContext<'_>, rx: &ReceiveSet<LogMsg>, ui: &m
SmartChannelSource::RrdHttpStream { url } => format!("Loading {url}…"),

SmartChannelSource::RrdWebEventListener
| SmartChannelSource::JsChannel { .. }
| SmartChannelSource::Sdk
| SmartChannelSource::WsClient { .. }
| SmartChannelSource::TcpServer { .. }
Expand Down
7 changes: 5 additions & 2 deletions crates/re_viewer/src/ui/top_panel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ fn connection_status_ui(ui: &mut egui::Ui, rx: &ReceiveSet<re_log_types::LogMsg>
re_smart_channel::SmartChannelSource::RrdWebEventListener
| re_smart_channel::SmartChannelSource::Sdk
| re_smart_channel::SmartChannelSource::WsClient { .. }
| re_smart_channel::SmartChannelSource::TcpServer { .. } => true,
| re_smart_channel::SmartChannelSource::TcpServer { .. }
| re_smart_channel::SmartChannelSource::JsChannel { .. } => true,
}
})
.collect_vec();
Expand Down Expand Up @@ -182,6 +183,7 @@ fn connection_status_ui(ui: &mut egui::Ui, rx: &ReceiveSet<re_log_types::LogMsg>
| SmartChannelSource::Stdin
| SmartChannelSource::RrdHttpStream { .. }
| SmartChannelSource::RrdWebEventListener
| SmartChannelSource::JsChannel { .. }
| SmartChannelSource::Sdk
| SmartChannelSource::WsClient { .. } => None,

Expand All @@ -206,7 +208,8 @@ fn connection_status_ui(ui: &mut egui::Ui, rx: &ReceiveSet<re_log_types::LogMsg>
re_smart_channel::SmartChannelSource::RrdHttpStream { url } => {
format!("Loading {url}…")
}
re_smart_channel::SmartChannelSource::RrdWebEventListener => {
re_smart_channel::SmartChannelSource::RrdWebEventListener
| re_smart_channel::SmartChannelSource::JsChannel { .. } => {
"Waiting for logging data…".to_owned()
}
re_smart_channel::SmartChannelSource::Sdk => {
Expand Down
5 changes: 3 additions & 2 deletions crates/re_viewer/src/viewer_analytics/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ pub fn open_recording(
re_smart_channel::SmartChannelSource::File(_) => "file", // .rrd, .png, .glb, …
re_smart_channel::SmartChannelSource::RrdHttpStream { .. } => "http",
re_smart_channel::SmartChannelSource::RrdWebEventListener { .. } => "web_event",
re_smart_channel::SmartChannelSource::Sdk => "sdk", // show()
re_smart_channel::SmartChannelSource::WsClient { .. } => "ws_client", // spawn()
re_smart_channel::SmartChannelSource::JsChannel { .. } => "javascript", // mediated via rerun-js
re_smart_channel::SmartChannelSource::Sdk => "sdk", // show()
re_smart_channel::SmartChannelSource::WsClient { .. } => "ws_client", // spawn()
re_smart_channel::SmartChannelSource::TcpServer { .. } => "tcp_server", // connect()
re_smart_channel::SmartChannelSource::Stdin => "stdin",
});
Expand Down
100 changes: 100 additions & 0 deletions crates/re_viewer/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#![allow(clippy::mem_forget)] // False positives from #[wasm_bindgen] macro

use ahash::HashMap;
use wasm_bindgen::prelude::*;

use re_log::ResultExt as _;
Expand All @@ -17,6 +18,12 @@ static GLOBAL: AccountingAllocator<std::alloc::System> =
#[wasm_bindgen]
pub struct WebHandle {
runner: eframe::WebRunner,

/// A dedicated smart channel used by the [`WebHandle::add_rrd_from_bytes`] API.
///
/// This exists because the direct bytes API is expected to submit many small RRD chunks
/// and allocating a new tx pair for each chunk doesn't make sense.
tx_channels: HashMap<String, re_smart_channel::Sender<re_log_types::LogMsg>>,
}

#[wasm_bindgen]
Expand All @@ -28,6 +35,7 @@ impl WebHandle {

Self {
runner: eframe::WebRunner::new(),
tx_channels: Default::default(),
}
}

Expand Down Expand Up @@ -107,6 +115,98 @@ impl WebHandle {
store_hub.remove_recording_by_uri(url);
}
}

/// Open a new channel for streaming data.
///
/// It is an error to open a channel twice with the same id.
#[wasm_bindgen]
pub fn open_channel(&mut self, id: &str, channel_name: &str) {
let Some(mut app) = self.runner.app_mut::<crate::App>() else {
return;
};

if self.tx_channels.contains_key(id) {
re_log::warn!("Channel with id '{}' already exists.", id);
return;
}

let (tx, rx) = re_smart_channel::smart_channel(
re_smart_channel::SmartMessageSource::JsChannelPush,
re_smart_channel::SmartChannelSource::JsChannel {
channel_name: channel_name.to_owned(),
},
);

app.add_receiver(rx);
self.tx_channels.insert(id.to_owned(), tx);
}

/// Close an existing channel for streaming data.
///
/// No-op if the channel is already closed.
#[wasm_bindgen]
pub fn close_channel(&mut self, id: &str) {
let Some(app) = self.runner.app_mut::<crate::App>() else {
return;
};

if let Some(tx) = self.tx_channels.remove(id) {
tx.quit(None).warn_on_err_once("Failed to send quit marker");
}

// Request a repaint since closing the channel may update the top bar.
app.re_ui
.egui_ctx
.request_repaint_after(std::time::Duration::from_millis(10));
}

/// Add an rrd to the viewer directly from a byte array.
#[wasm_bindgen]
pub fn send_rrd_to_channel(&mut self, id: &str, data: &[u8]) {
use std::{ops::ControlFlow, sync::Arc};
let Some(app) = self.runner.app_mut::<crate::App>() else {
return;
};

if let Some(tx) = self.tx_channels.get(id).cloned() {
let data: Vec<u8> = data.to_vec();

let egui_ctx = app.re_ui.egui_ctx.clone();

let ui_waker = Box::new(move || {
// Spend a few more milliseconds decoding incoming messages,
// then trigger a repaint (https://github.com/rerun-io/rerun/issues/963):
egui_ctx.request_repaint_after(std::time::Duration::from_millis(10));
});

re_log_encoding::stream_rrd_from_http::web_decode::decode_rrd(
data,
Arc::new({
move |msg| {
ui_waker();
use re_log_encoding::stream_rrd_from_http::HttpMessage;
match msg {
HttpMessage::LogMsg(msg) => {
if tx.send(msg).is_ok() {
ControlFlow::Continue(())
} else {
re_log::info_once!("Failed to dispatch log message to viewer.");
ControlFlow::Break(())
}
}
// TODO(jleibs): Unclear what we want to do here. More data is coming.
HttpMessage::Success => ControlFlow::Continue(()),
HttpMessage::Failure(err) => {
tx.quit(Some(err))
.warn_on_err_once("Failed to send quit marker");
ControlFlow::Break(())
}
}
}
}),
);
}
}
}

fn create_app(
Expand Down
73 changes: 73 additions & 0 deletions rerun_js/web-viewer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ export class WebViewer {
throw new Error(`Web viewer crashed: ${this.#handle.panic_message()}`);
}

this.#state = "ready";

if (rrd) {
this.open(rrd);
}
Expand Down Expand Up @@ -131,4 +133,75 @@ export class WebViewer {
this.#canvas = null;
this.#handle = null;
}

/**
* Opens a new channel for sending log messages.
*
* The channel can be used to incrementally push `rrd` chunks into the viewer.
*
* @param {string} channel_name used to identify the channel.
*
* @returns {LogChannel}
*/
open_channel(channel_name = "rerun-io/web-viewer") {
if (!this.#handle) throw new Error("...");
const id = crypto.randomUUID();
this.#handle.open_channel(id, channel_name);
const on_send = (/** @type {Uint8Array} */ data) => {
if (!this.#handle) throw new Error("...");
this.#handle.send_rrd_to_channel(id, data);
};
const on_close = () => {
if (!this.#handle) throw new Error("...");
this.#handle.close_channel(id);
};
const get_state = () => this.#state;
return new LogChannel(on_send, on_close, get_state);
}
}

export class LogChannel {
#on_send;
#on_close;
#get_state;
#closed = false;

/** @internal
*
* @param {(data: Uint8Array) => void} on_send
* @param {() => void} on_close
* @param {() => 'ready' | 'starting' | 'stopped'} get_state
*/
constructor(on_send, on_close, get_state) {
this.#on_send = on_send;
this.#on_close = on_close;
this.#get_state = get_state;
}

get ready() {
return !this.#closed && this.#get_state() === "ready";
}

/**
* Send an `rrd` containing log messages to the viewer.
*
* Does nothing if `!this.ready`.
*
* @param {Uint8Array} rrd_bytes Is an rrd file stored in a byte array, received via some other side channel.
*/
send_rrd(rrd_bytes) {
if (!this.ready) return;
this.#on_send(rrd_bytes);
}

/**
* Close the channel.
*
* Does nothing if `!this.ready`.
*/
close() {
if (!this.ready) return;
this.#on_close();
this.#closed = true;
}
}
Loading