Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load .rrd file over HTTP #1600

Merged
merged 10 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion crates/re_smart_channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub enum Source {
/// The source if a file on disk
File { path: std::path::PathBuf },

/// Streaming an `.rrd` file over http.
RrdHttpStream { url: String },

/// The source is the logging sdk directly, same process.
Sdk,

Expand All @@ -33,7 +36,7 @@ impl Source {
pub fn is_network(&self) -> bool {
match self {
Self::File { .. } | Self::Sdk => false,
Self::WsClient { .. } | Self::TcpServer { .. } => true,
Self::RrdHttpStream { .. } | Self::WsClient { .. } | Self::TcpServer { .. } => true,
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions crates/re_viewer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ egui_dock = { workspace = true, features = ["serde"] }
egui_extras = { workspace = true, features = ["tracing"] }
egui-notify = "0.6"
egui-wgpu.workspace = true
ehttp = "0.2"
enumset.workspace = true
glam = { workspace = true, features = [
"mint",
Expand Down Expand Up @@ -125,6 +126,12 @@ winapi = "0.3.9"
[target.'cfg(target_arch = "wasm32")'.dependencies]
console_error_panic_hook = "0.1.6"
wasm-bindgen-futures = "0.4"
js-sys = "0.3"
wasm-bindgen = "0.2"

[dependencies.web-sys]
version = "0.3.52"
features = ["Window"]


[build-dependencies]
Expand Down
6 changes: 5 additions & 1 deletion crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,9 @@ fn wait_screen_ui(ui: &mut egui::Ui, rx: &Receiver<LogMsg>) {
re_smart_channel::Source::File { path } => {
ui.strong(format!("Loading {}…", path.display()));
}
re_smart_channel::Source::RrdHttpStream { url } => {
ui.strong(format!("Loading {url}…"));
}
re_smart_channel::Source::Sdk => {
ready_and_waiting(ui, "Waiting for logging data from SDK");
}
Expand Down Expand Up @@ -1781,7 +1784,8 @@ fn new_recording_confg(
re_smart_channel::Source::File { .. } => PlayState::Playing,

// Live data - follow it!
re_smart_channel::Source::Sdk
re_smart_channel::Source::RrdHttpStream { .. }
| re_smart_channel::Source::Sdk
| re_smart_channel::Source::WsClient { .. }
| re_smart_channel::Source::TcpServer { .. } => PlayState::Following,
};
Expand Down
123 changes: 123 additions & 0 deletions crates/re_viewer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,126 @@ pub fn wake_up_ui_thread_on_each_msg<T: Send + 'static>(
.unwrap();
new_rx
}

// ---------------------------------------------------------------------------

pub fn stream_rrd_from_http_to_channel(
url: String,
) -> re_smart_channel::Receiver<re_log_types::LogMsg> {
let (tx, rx) = re_smart_channel::smart_channel(re_smart_channel::Source::RrdHttpStream {
url: url.clone(),
});
stream_rrd_from_http(
url,
Box::new(move |msg| {
tx.send(msg).ok();
}),
);
rx
}

pub fn stream_rrd_from_http(url: String, on_msg: Box<dyn Fn(re_log_types::LogMsg) + Send>) {
re_log::debug!("Downloading .rrd file from {url:?}…");

// TODO(emilk): stream the http request, progressively decoding the .rrd file.
ehttp::fetch(ehttp::Request::get(&url), move |result| match result {
Ok(response) => {
if response.ok {
re_log::debug!("Decoding .rrd file from {url:?}…");
decode_rrd(response.bytes, on_msg);
} else {
re_log::error!(
"Failed to fetch .rrd file from {url}: {} {}",
response.status,
response.status_text
);
}
}
Err(err) => {
re_log::error!("Failed to fetch .rrd file from {url}: {err}");
}
});
}

#[cfg(not(target_arch = "wasm32"))]
#[allow(clippy::needless_pass_by_value)] // must match wasm version
fn decode_rrd(rrd_bytes: Vec<u8>, on_msg: Box<dyn Fn(re_log_types::LogMsg) + Send>) {
match re_log_types::encoding::Decoder::new(rrd_bytes.as_slice()) {
Ok(decoder) => {
for msg in decoder {
match msg {
Ok(msg) => {
on_msg(msg);
}
Err(err) => {
re_log::warn_once!("Failed to decode message: {err}");
}
}
}
}
Err(err) => {
re_log::error!("Failed to decode .rrd: {err}");
}
}
}

#[cfg(target_arch = "wasm32")]
mod web_decode {
pub fn decode_rrd(rrd_bytes: Vec<u8>, on_msg: Box<dyn Fn(re_log_types::LogMsg) + Send>) {
wasm_bindgen_futures::spawn_local(decode_rrd_async(rrd_bytes, on_msg))
}

/// Decodes the file in chunks, with an yield between each chunk.
///
/// This is cooperative multi-tasking.
async fn decode_rrd_async(
rrd_bytes: Vec<u8>,
on_msg: Box<dyn Fn(re_log_types::LogMsg) + Send>,
) {
let mut last_yield = instant::Instant::now();

match re_log_types::encoding::Decoder::new(rrd_bytes.as_slice()) {
Ok(decoder) => {
for msg in decoder {
match msg {
Ok(msg) => {
on_msg(msg);
}
Err(err) => {
re_log::warn_once!("Failed to decode message: {err}");
}
}

if last_yield.elapsed() > instant::Duration::from_millis(10) {
// yield to the ui task
yield_().await;
last_yield = instant::Instant::now();
}
}
}
Err(err) => {
re_log::error!("Failed to decode .rrd: {err}");
}
}
}

// Yield to other tasks
async fn yield_() {
sleep_ms(1).await; // TODO(emilk): create a better async yield function
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

// Hack to get async sleep on wasm
async fn sleep_ms(millis: i32) {
let mut cb = |resolve: js_sys::Function, _reject: js_sys::Function| {
web_sys::window()
.unwrap()
.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, millis)
.expect("Failed to call set_timeout");
};
let p = js_sys::Promise::new(&mut cb);
wasm_bindgen_futures::JsFuture::from(p).await.unwrap();
}
}

#[cfg(target_arch = "wasm32")]
use web_decode::decode_rrd;
60 changes: 35 additions & 25 deletions crates/re_viewer/src/remote_viewer_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,36 +44,41 @@ impl RemoteViewerApp {

re_log::info!("Connecting to WS server at {:?}…", self.url);

let connection =
re_ws_comms::Connection::viewer_to_server(self.url.clone(), move |binary: Vec<u8>| {
match re_ws_comms::decode_log_msg(&binary) {
Ok(log_msg) => {
if tx.send(log_msg).is_ok() {
egui_ctx.request_repaint(); // Wake up UI thread
std::ops::ControlFlow::Continue(())
} else {
re_log::info!("Failed to send log message to viewer - closing");
std::ops::ControlFlow::Break(())
}
}
Err(err) => {
re_log::error!("Failed to parse message: {}", re_error::format(&err));
let callback = move |binary: Vec<u8>| {
match re_ws_comms::decode_log_msg(&binary) {
Ok(log_msg) => {
if tx.send(log_msg).is_ok() {
egui_ctx.request_repaint(); // Wake up UI thread
std::ops::ControlFlow::Continue(())
} else {
re_log::info!("Failed to send log message to viewer - closing");
std::ops::ControlFlow::Break(())
}
}
})
.unwrap(); // TODO(emilk): handle error
Err(err) => {
re_log::error!("Failed to parse message: {}", re_error::format(&err));
std::ops::ControlFlow::Break(())
}
}
};

let app = crate::App::from_receiver(
self.build_info,
&self.app_env,
self.startup_options,
self.re_ui.clone(),
storage,
rx,
);
match re_ws_comms::Connection::viewer_to_server(self.url.clone(), callback) {
Ok(connection) => {
let app = crate::App::from_receiver(
self.build_info,
&self.app_env,
self.startup_options,
self.re_ui.clone(),
storage,
rx,
);

self.app = Some((connection, app));
self.app = Some((connection, app));
}
Err(err) => {
re_log::error!("Failed to connect to {:?}: {}", self.url, err);
}
}
}

#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -118,6 +123,11 @@ impl eframe::App for RemoteViewerApp {

if let Some((_, app)) = &mut self.app {
app.update(egui_ctx, frame);
} else {
egui::CentralPanel::default().show(egui_ctx, |ui| {
// TODO(emilk): show the error message.
ui.label("An error occurred.\nCheck the debug console for details.");
});
}
}
}
3 changes: 2 additions & 1 deletion crates/re_viewer/src/viewer_analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ impl ViewerAnalytics {
if let Some(data_source) = &log_db.data_source {
let data_source = match data_source {
re_smart_channel::Source::File { .. } => "file", // .rrd
re_smart_channel::Source::Sdk => "sdk", // show()
re_smart_channel::Source::RrdHttpStream { .. } => "http",
re_smart_channel::Source::Sdk => "sdk", // show()
re_smart_channel::Source::WsClient { .. } => "ws_client", // spawn()
re_smart_channel::Source::TcpServer { .. } => "tcp_server", // connect()
};
Expand Down
54 changes: 43 additions & 11 deletions crates/re_viewer/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@ static GLOBAL: AccountingAllocator<std::alloc::System> =
AccountingAllocator::new(std::alloc::System);

/// This is the entry-point for all the Wasm.
///
/// This is called once from the HTML.
/// It loads the app, installs some callbacks, then returns.
/// The `url` is an optional URL to either an .rrd file over http, or a Rerun WebSocket server.
#[wasm_bindgen]
pub async fn start(canvas_id: &str) -> std::result::Result<(), eframe::wasm_bindgen::JsValue> {
pub async fn start(
canvas_id: &str,
url: Option<String>,
) -> std::result::Result<(), eframe::wasm_bindgen::JsValue> {
// Make sure panics are logged using `console.error`.
console_error_panic_hook::set_once();

Expand All @@ -30,16 +35,43 @@ pub async fn start(canvas_id: &str) -> std::result::Result<(), eframe::wasm_bind
let app_env = crate::AppEnvironment::Web;
let startup_options = crate::StartupOptions::default();
let re_ui = crate::customize_eframe(cc);
let url = get_url(&cc.integration_info);
let app = crate::RemoteViewerApp::new(
build_info,
app_env,
startup_options,
re_ui,
cc.storage,
url,
);
Box::new(app)
let url = url.unwrap_or_else(|| get_url(&cc.integration_info));

if url.starts_with("http") || url.ends_with(".rrd") {
emilk marked this conversation as resolved.
Show resolved Hide resolved
// Download an .rrd file over http:

let (tx, rx) =
re_smart_channel::smart_channel(re_smart_channel::Source::RrdHttpStream {
url: url.clone(),
});
let egui_ctx = cc.egui_ctx.clone();
crate::stream_rrd_from_http(
url,
Box::new(move |msg| {
egui_ctx.request_repaint(); // wake up ui thread
tx.send(msg).ok();
}),
);

Box::new(crate::App::from_receiver(
build_info,
&app_env,
startup_options,
re_ui,
cc.storage,
rx,
))
} else {
// Connect to a Rerun server over WebSockets.
Box::new(crate::RemoteViewerApp::new(
build_info,
app_env,
startup_options,
re_ui,
cc.storage,
url,
))
}
}),
)
.await?;
Expand Down
Loading