Skip to content

Commit

Permalink
Add a new javascript API for submitting an RRD that is stored directl…
Browse files Browse the repository at this point in the history
…y as bytes (#6189)

### What
- This was motivated by #2013

This API means that an RRD file transmitted over a side-channel such as
gradio can be injected directly into the viewer instead of needing to
point the viewer at a separate url.

Tested with an adhoc svelte component:
```ts
<script lang="ts">
    import { WebViewer, LogChannel } from "@rerun-io/web-viewer";

    import { onMount } from "svelte";

    let rr: WebViewer;
    let ref: HTMLDivElement;
    let channel: LogChannel;

    onMount(() => {
        rr = new WebViewer();
        rr.start(undefined, ref).then(() => {
            channel = rr.open_channel("gradio");
        });

        return () => {
            channel.close();
            rr.stop();
        };
    });

    let data1 = '...'
    let data2 = '...'
    let data3 = '...'

    function push(data: string) {
        var intermediate = atob(data);
        var buff = new Uint8Array(intermediate.length);
        for (var i = 0; i < intermediate.length; i++) {
            {
                buff[i] = intermediate.charCodeAt(i);
            }
        }
        channel.send_rrd(buff);
    }

</script>

<div class="container">
    <button on:click={() => push(data1)}>
    Data1
    </button>
    <button on:click={() => push(data2)}>
    Data2
    </button>
    <button on:click={() => push(data3)}>
    Data3
    </button>
</div>

<div class="viewer" bind:this={ref} />
```

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested the web demo (if applicable):
* Using examples from latest `main` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/6189?manifest_url=https://app.rerun.io/version/main/examples_manifest.json)
* Using full set of examples from `nightly` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/6189?manifest_url=https://app.rerun.io/version/nightly/examples_manifest.json)
* [x] The PR title and labels are set such as to maximize their
usefulness for the next release's CHANGELOG
* [x] If applicable, add a new check to the [release
checklist](https://github.com/rerun-io/rerun/blob/main/tests/python/release_checklist)!

- [PR Build Summary](https://build.rerun.io/pr/6189)
- [Recent benchmark results](https://build.rerun.io/graphs/crates.html)
- [Wasm size tracking](https://build.rerun.io/graphs/sizes.html)

To run all checks from `main`, comment on the PR with `@rerun-bot
full-check`.
  • Loading branch information
jleibs authored May 2, 2024
1 parent a9c9706 commit 01efdf5
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 8 deletions.
2 changes: 1 addition & 1 deletion crates/re_log_encoding/src/stream_rrd_from_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ mod web_event_listener {
pub use web_event_listener::stream_rrd_from_event_listener;

#[cfg(target_arch = "wasm32")]
mod web_decode {
pub mod web_decode {
use super::{HttpMessage, HttpMessageCallback};
use std::sync::Arc;

Expand Down
16 changes: 15 additions & 1 deletion crates/re_smart_channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ pub enum SmartChannelSource {
/// Used for the inline web viewer in a notebook.
RrdWebEventListener,

/// The channel was created in the context of a javascript client submitting an RRD directly as bytes.
JsChannel {
/// The name of the channel reported by the javascript client.
channel_name: String,
},

/// The channel was created in the context of loading data using a Rerun SDK sharing the same
/// process.
Sdk,
Expand Down Expand Up @@ -62,6 +68,7 @@ impl std::fmt::Display for SmartChannelSource {
Self::File(path) => path.display().fmt(f),
Self::RrdHttpStream { url } => url.fmt(f),
Self::RrdWebEventListener => "Web event listener".fmt(f),
Self::JsChannel { channel_name } => write!(f, "Javascript channel: {channel_name}"),
Self::Sdk => "SDK".fmt(f),
Self::WsClient { ws_server_url } => ws_server_url.fmt(f),
Self::TcpServer { port } => write!(f, "TCP server, port {port}"),
Expand All @@ -74,7 +81,10 @@ impl SmartChannelSource {
pub fn is_network(&self) -> bool {
match self {
Self::File(_) | Self::Sdk | Self::RrdWebEventListener | Self::Stdin => false,
Self::RrdHttpStream { .. } | Self::WsClient { .. } | Self::TcpServer { .. } => true,
Self::RrdHttpStream { .. }
| Self::WsClient { .. }
| Self::JsChannel { .. }
| Self::TcpServer { .. } => true,
}
}
}
Expand Down Expand Up @@ -104,6 +114,9 @@ pub enum SmartMessageSource {
/// Only applicable to web browser iframes.
RrdWebEventCallback,

/// The sender is a javascript client submitting an RRD directly as bytes.
JsChannelPush,

/// The sender is a Rerun SDK running from another thread in the same process.
Sdk,

Expand Down Expand Up @@ -133,6 +146,7 @@ impl std::fmt::Display for SmartMessageSource {
SmartMessageSource::File(path) => format!("file://{}", path.to_string_lossy()),
SmartMessageSource::RrdHttpStream { url } => format!("http://{url}"),
SmartMessageSource::RrdWebEventCallback => "web_callback".into(),
SmartMessageSource::JsChannelPush => "javascript".into(),
SmartMessageSource::Sdk => "sdk".into(),
SmartMessageSource::WsClient { ws_server_url } => ws_server_url.clone(),
SmartMessageSource::TcpClient { addr } => format!(
Expand Down
4 changes: 3 additions & 1 deletion crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ impl App {
SmartChannelSource::File(_) | SmartChannelSource::RrdHttpStream { .. } => false,

SmartChannelSource::WsClient { .. }
| SmartChannelSource::JsChannel { .. }
| SmartChannelSource::RrdWebEventListener
| SmartChannelSource::Sdk
| SmartChannelSource::TcpServer { .. }
Expand Down Expand Up @@ -1207,7 +1208,8 @@ impl App {
| SmartChannelSource::Stdin
| SmartChannelSource::RrdWebEventListener
| SmartChannelSource::Sdk
| SmartChannelSource::WsClient { .. } => {
| SmartChannelSource::WsClient { .. }
| SmartChannelSource::JsChannel { .. } => {
return true; // We expect data soon, so fade-in
}

Expand Down
3 changes: 2 additions & 1 deletion crates/re_viewer/src/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,8 @@ fn recording_config_entry<'cfgs>(
re_smart_channel::SmartChannelSource::Sdk
| re_smart_channel::SmartChannelSource::WsClient { .. }
| re_smart_channel::SmartChannelSource::TcpServer { .. }
| re_smart_channel::SmartChannelSource::Stdin => PlayState::Following,
| re_smart_channel::SmartChannelSource::Stdin
| re_smart_channel::SmartChannelSource::JsChannel { .. } => PlayState::Following,
}
} else {
PlayState::Following // No known source 🤷‍♂️
Expand Down
1 change: 1 addition & 0 deletions crates/re_viewer/src/ui/recordings_panel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ fn loading_receivers_ui(ctx: &ViewerContext<'_>, rx: &ReceiveSet<LogMsg>, ui: &m
SmartChannelSource::RrdHttpStream { url } => format!("Loading {url}…"),

SmartChannelSource::RrdWebEventListener
| SmartChannelSource::JsChannel { .. }
| SmartChannelSource::Sdk
| SmartChannelSource::WsClient { .. }
| SmartChannelSource::TcpServer { .. }
Expand Down
7 changes: 5 additions & 2 deletions crates/re_viewer/src/ui/top_panel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ fn connection_status_ui(ui: &mut egui::Ui, rx: &ReceiveSet<re_log_types::LogMsg>
re_smart_channel::SmartChannelSource::RrdWebEventListener
| re_smart_channel::SmartChannelSource::Sdk
| re_smart_channel::SmartChannelSource::WsClient { .. }
| re_smart_channel::SmartChannelSource::TcpServer { .. } => true,
| re_smart_channel::SmartChannelSource::TcpServer { .. }
| re_smart_channel::SmartChannelSource::JsChannel { .. } => true,
}
})
.collect_vec();
Expand Down Expand Up @@ -182,6 +183,7 @@ fn connection_status_ui(ui: &mut egui::Ui, rx: &ReceiveSet<re_log_types::LogMsg>
| SmartChannelSource::Stdin
| SmartChannelSource::RrdHttpStream { .. }
| SmartChannelSource::RrdWebEventListener
| SmartChannelSource::JsChannel { .. }
| SmartChannelSource::Sdk
| SmartChannelSource::WsClient { .. } => None,

Expand All @@ -206,7 +208,8 @@ fn connection_status_ui(ui: &mut egui::Ui, rx: &ReceiveSet<re_log_types::LogMsg>
re_smart_channel::SmartChannelSource::RrdHttpStream { url } => {
format!("Loading {url}…")
}
re_smart_channel::SmartChannelSource::RrdWebEventListener => {
re_smart_channel::SmartChannelSource::RrdWebEventListener
| re_smart_channel::SmartChannelSource::JsChannel { .. } => {
"Waiting for logging data…".to_owned()
}
re_smart_channel::SmartChannelSource::Sdk => {
Expand Down
5 changes: 3 additions & 2 deletions crates/re_viewer/src/viewer_analytics/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ pub fn open_recording(
re_smart_channel::SmartChannelSource::File(_) => "file", // .rrd, .png, .glb, …
re_smart_channel::SmartChannelSource::RrdHttpStream { .. } => "http",
re_smart_channel::SmartChannelSource::RrdWebEventListener { .. } => "web_event",
re_smart_channel::SmartChannelSource::Sdk => "sdk", // show()
re_smart_channel::SmartChannelSource::WsClient { .. } => "ws_client", // spawn()
re_smart_channel::SmartChannelSource::JsChannel { .. } => "javascript", // mediated via rerun-js
re_smart_channel::SmartChannelSource::Sdk => "sdk", // show()
re_smart_channel::SmartChannelSource::WsClient { .. } => "ws_client", // spawn()
re_smart_channel::SmartChannelSource::TcpServer { .. } => "tcp_server", // connect()
re_smart_channel::SmartChannelSource::Stdin => "stdin",
});
Expand Down
100 changes: 100 additions & 0 deletions crates/re_viewer/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#![allow(clippy::mem_forget)] // False positives from #[wasm_bindgen] macro

use ahash::HashMap;
use wasm_bindgen::prelude::*;

use re_log::ResultExt as _;
Expand All @@ -17,6 +18,12 @@ static GLOBAL: AccountingAllocator<std::alloc::System> =
#[wasm_bindgen]
pub struct WebHandle {
runner: eframe::WebRunner,

/// A dedicated smart channel used by the [`WebHandle::add_rrd_from_bytes`] API.
///
/// This exists because the direct bytes API is expected to submit many small RRD chunks
/// and allocating a new tx pair for each chunk doesn't make sense.
tx_channels: HashMap<String, re_smart_channel::Sender<re_log_types::LogMsg>>,
}

#[wasm_bindgen]
Expand All @@ -28,6 +35,7 @@ impl WebHandle {

Self {
runner: eframe::WebRunner::new(),
tx_channels: Default::default(),
}
}

Expand Down Expand Up @@ -107,6 +115,98 @@ impl WebHandle {
store_hub.remove_recording_by_uri(url);
}
}

/// Open a new channel for streaming data.
///
/// It is an error to open a channel twice with the same id.
#[wasm_bindgen]
pub fn open_channel(&mut self, id: &str, channel_name: &str) {
let Some(mut app) = self.runner.app_mut::<crate::App>() else {
return;
};

if self.tx_channels.contains_key(id) {
re_log::warn!("Channel with id '{}' already exists.", id);
return;
}

let (tx, rx) = re_smart_channel::smart_channel(
re_smart_channel::SmartMessageSource::JsChannelPush,
re_smart_channel::SmartChannelSource::JsChannel {
channel_name: channel_name.to_owned(),
},
);

app.add_receiver(rx);
self.tx_channels.insert(id.to_owned(), tx);
}

/// Close an existing channel for streaming data.
///
/// No-op if the channel is already closed.
#[wasm_bindgen]
pub fn close_channel(&mut self, id: &str) {
let Some(app) = self.runner.app_mut::<crate::App>() else {
return;
};

if let Some(tx) = self.tx_channels.remove(id) {
tx.quit(None).warn_on_err_once("Failed to send quit marker");
}

// Request a repaint since closing the channel may update the top bar.
app.re_ui
.egui_ctx
.request_repaint_after(std::time::Duration::from_millis(10));
}

/// Add an rrd to the viewer directly from a byte array.
#[wasm_bindgen]
pub fn send_rrd_to_channel(&mut self, id: &str, data: &[u8]) {
use std::{ops::ControlFlow, sync::Arc};
let Some(app) = self.runner.app_mut::<crate::App>() else {
return;
};

if let Some(tx) = self.tx_channels.get(id).cloned() {
let data: Vec<u8> = data.to_vec();

let egui_ctx = app.re_ui.egui_ctx.clone();

let ui_waker = Box::new(move || {
// Spend a few more milliseconds decoding incoming messages,
// then trigger a repaint (https://github.com/rerun-io/rerun/issues/963):
egui_ctx.request_repaint_after(std::time::Duration::from_millis(10));
});

re_log_encoding::stream_rrd_from_http::web_decode::decode_rrd(
data,
Arc::new({
move |msg| {
ui_waker();
use re_log_encoding::stream_rrd_from_http::HttpMessage;
match msg {
HttpMessage::LogMsg(msg) => {
if tx.send(msg).is_ok() {
ControlFlow::Continue(())
} else {
re_log::info_once!("Failed to dispatch log message to viewer.");
ControlFlow::Break(())
}
}
// TODO(jleibs): Unclear what we want to do here. More data is coming.
HttpMessage::Success => ControlFlow::Continue(()),
HttpMessage::Failure(err) => {
tx.quit(Some(err))
.warn_on_err_once("Failed to send quit marker");
ControlFlow::Break(())
}
}
}
}),
);
}
}
}

fn create_app(
Expand Down
73 changes: 73 additions & 0 deletions rerun_js/web-viewer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ export class WebViewer {
throw new Error(`Web viewer crashed: ${this.#handle.panic_message()}`);
}

this.#state = "ready";

if (rrd) {
this.open(rrd);
}
Expand Down Expand Up @@ -131,4 +133,75 @@ export class WebViewer {
this.#canvas = null;
this.#handle = null;
}

/**
* Opens a new channel for sending log messages.
*
* The channel can be used to incrementally push `rrd` chunks into the viewer.
*
* @param {string} channel_name used to identify the channel.
*
* @returns {LogChannel}
*/
open_channel(channel_name = "rerun-io/web-viewer") {
if (!this.#handle) throw new Error("...");
const id = crypto.randomUUID();
this.#handle.open_channel(id, channel_name);
const on_send = (/** @type {Uint8Array} */ data) => {
if (!this.#handle) throw new Error("...");
this.#handle.send_rrd_to_channel(id, data);
};
const on_close = () => {
if (!this.#handle) throw new Error("...");
this.#handle.close_channel(id);
};
const get_state = () => this.#state;
return new LogChannel(on_send, on_close, get_state);
}
}

export class LogChannel {
#on_send;
#on_close;
#get_state;
#closed = false;

/** @internal
*
* @param {(data: Uint8Array) => void} on_send
* @param {() => void} on_close
* @param {() => 'ready' | 'starting' | 'stopped'} get_state
*/
constructor(on_send, on_close, get_state) {
this.#on_send = on_send;
this.#on_close = on_close;
this.#get_state = get_state;
}

get ready() {
return !this.#closed && this.#get_state() === "ready";
}

/**
* Send an `rrd` containing log messages to the viewer.
*
* Does nothing if `!this.ready`.
*
* @param {Uint8Array} rrd_bytes Is an rrd file stored in a byte array, received via some other side channel.
*/
send_rrd(rrd_bytes) {
if (!this.ready) return;
this.#on_send(rrd_bytes);
}

/**
* Close the channel.
*
* Does nothing if `!this.ready`.
*/
close() {
if (!this.ready) return;
this.#on_close();
this.#closed = true;
}
}

0 comments on commit 01efdf5

Please sign in to comment.