Skip to content

Commit

Permalink
Load .rrd file over HTTP (#1600)
Browse files Browse the repository at this point in the history
* Add option to stream .rrd files over http

* the `url=` parameter in the web-viewer can point to an rrd over http

* You can specify the .rrd http from the index.html

* Better handle WS connection errors

* Better logging and logic

* fix typo

* Cleanup

* Progressively decode the .rrd file on web

* Refactor: move rrd/http streaming to own file

* Nicer protocol classification
  • Loading branch information
emilk authored Mar 17, 2023
1 parent 9ed0f27 commit 5e88054
Show file tree
Hide file tree
Showing 11 changed files with 323 additions and 67 deletions.
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion crates/re_smart_channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub enum Source {
/// The source if a file on disk
File { path: std::path::PathBuf },

/// Streaming an `.rrd` file over http.
RrdHttpStream { url: String },

/// The source is the logging sdk directly, same process.
Sdk,

Expand All @@ -33,7 +36,7 @@ impl Source {
pub fn is_network(&self) -> bool {
match self {
Self::File { .. } | Self::Sdk => false,
Self::WsClient { .. } | Self::TcpServer { .. } => true,
Self::RrdHttpStream { .. } | Self::WsClient { .. } | Self::TcpServer { .. } => true,
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions crates/re_viewer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ egui_dock = { workspace = true, features = ["serde"] }
egui_extras = { workspace = true, features = ["tracing"] }
egui-notify = "0.6"
egui-wgpu.workspace = true
ehttp = "0.2"
enumset.workspace = true
glam = { workspace = true, features = [
"mint",
Expand Down Expand Up @@ -125,6 +126,12 @@ winapi = "0.3.9"
[target.'cfg(target_arch = "wasm32")'.dependencies]
console_error_panic_hook = "0.1.6"
wasm-bindgen-futures = "0.4"
js-sys = "0.3"
wasm-bindgen = "0.2"

[dependencies.web-sys]
version = "0.3.52"
features = ["Window"]


[build-dependencies]
Expand Down
6 changes: 5 additions & 1 deletion crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,9 @@ fn wait_screen_ui(ui: &mut egui::Ui, rx: &Receiver<LogMsg>) {
re_smart_channel::Source::File { path } => {
ui.strong(format!("Loading {}…", path.display()));
}
re_smart_channel::Source::RrdHttpStream { url } => {
ui.strong(format!("Loading {url}…"));
}
re_smart_channel::Source::Sdk => {
ready_and_waiting(ui, "Waiting for logging data from SDK");
}
Expand Down Expand Up @@ -1781,7 +1784,8 @@ fn new_recording_confg(
re_smart_channel::Source::File { .. } => PlayState::Playing,

// Live data - follow it!
re_smart_channel::Source::Sdk
re_smart_channel::Source::RrdHttpStream { .. }
| re_smart_channel::Source::Sdk
| re_smart_channel::Source::WsClient { .. }
| re_smart_channel::Source::TcpServer { .. } => PlayState::Following,
};
Expand Down
1 change: 1 addition & 0 deletions crates/re_viewer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod env_vars;
pub mod math;
mod misc;
mod remote_viewer_app;
pub mod stream_rrd_from_http;
mod ui;
mod viewer_analytics;

Expand Down
60 changes: 35 additions & 25 deletions crates/re_viewer/src/remote_viewer_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,36 +44,41 @@ impl RemoteViewerApp {

re_log::info!("Connecting to WS server at {:?}…", self.url);

let connection =
re_ws_comms::Connection::viewer_to_server(self.url.clone(), move |binary: Vec<u8>| {
match re_ws_comms::decode_log_msg(&binary) {
Ok(log_msg) => {
if tx.send(log_msg).is_ok() {
egui_ctx.request_repaint(); // Wake up UI thread
std::ops::ControlFlow::Continue(())
} else {
re_log::info!("Failed to send log message to viewer - closing");
std::ops::ControlFlow::Break(())
}
}
Err(err) => {
re_log::error!("Failed to parse message: {}", re_error::format(&err));
let callback = move |binary: Vec<u8>| {
match re_ws_comms::decode_log_msg(&binary) {
Ok(log_msg) => {
if tx.send(log_msg).is_ok() {
egui_ctx.request_repaint(); // Wake up UI thread
std::ops::ControlFlow::Continue(())
} else {
re_log::info!("Failed to send log message to viewer - closing");
std::ops::ControlFlow::Break(())
}
}
})
.unwrap(); // TODO(emilk): handle error
Err(err) => {
re_log::error!("Failed to parse message: {}", re_error::format(&err));
std::ops::ControlFlow::Break(())
}
}
};

let app = crate::App::from_receiver(
self.build_info,
&self.app_env,
self.startup_options,
self.re_ui.clone(),
storage,
rx,
);
match re_ws_comms::Connection::viewer_to_server(self.url.clone(), callback) {
Ok(connection) => {
let app = crate::App::from_receiver(
self.build_info,
&self.app_env,
self.startup_options,
self.re_ui.clone(),
storage,
rx,
);

self.app = Some((connection, app));
self.app = Some((connection, app));
}
Err(err) => {
re_log::error!("Failed to connect to {:?}: {}", self.url, err);
}
}
}

#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -118,6 +123,11 @@ impl eframe::App for RemoteViewerApp {

if let Some((_, app)) = &mut self.app {
app.update(egui_ctx, frame);
} else {
egui::CentralPanel::default().show(egui_ctx, |ui| {
// TODO(emilk): show the error message.
ui.label("An error occurred.\nCheck the debug console for details.");
});
}
}
}
120 changes: 120 additions & 0 deletions crates/re_viewer/src/stream_rrd_from_http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
pub fn stream_rrd_from_http_to_channel(
url: String,
) -> re_smart_channel::Receiver<re_log_types::LogMsg> {
let (tx, rx) = re_smart_channel::smart_channel(re_smart_channel::Source::RrdHttpStream {
url: url.clone(),
});
stream_rrd_from_http(
url,
Box::new(move |msg| {
tx.send(msg).ok();
}),
);
rx
}

pub fn stream_rrd_from_http(url: String, on_msg: Box<dyn Fn(re_log_types::LogMsg) + Send>) {
re_log::debug!("Downloading .rrd file from {url:?}…");

// TODO(emilk): stream the http request, progressively decoding the .rrd file.
ehttp::fetch(ehttp::Request::get(&url), move |result| match result {
Ok(response) => {
if response.ok {
re_log::debug!("Decoding .rrd file from {url:?}…");
decode_rrd(response.bytes, on_msg);
} else {
re_log::error!(
"Failed to fetch .rrd file from {url}: {} {}",
response.status,
response.status_text
);
}
}
Err(err) => {
re_log::error!("Failed to fetch .rrd file from {url}: {err}");
}
});
}

#[cfg(not(target_arch = "wasm32"))]
#[allow(clippy::needless_pass_by_value)] // must match wasm version
fn decode_rrd(rrd_bytes: Vec<u8>, on_msg: Box<dyn Fn(re_log_types::LogMsg) + Send>) {
match re_log_types::encoding::Decoder::new(rrd_bytes.as_slice()) {
Ok(decoder) => {
for msg in decoder {
match msg {
Ok(msg) => {
on_msg(msg);
}
Err(err) => {
re_log::warn_once!("Failed to decode message: {err}");
}
}
}
}
Err(err) => {
re_log::error!("Failed to decode .rrd: {err}");
}
}
}

#[cfg(target_arch = "wasm32")]
mod web_decode {
pub fn decode_rrd(rrd_bytes: Vec<u8>, on_msg: Box<dyn Fn(re_log_types::LogMsg) + Send>) {
wasm_bindgen_futures::spawn_local(decode_rrd_async(rrd_bytes, on_msg))
}

/// Decodes the file in chunks, with an yield between each chunk.
///
/// This is cooperative multi-tasking.
async fn decode_rrd_async(
rrd_bytes: Vec<u8>,
on_msg: Box<dyn Fn(re_log_types::LogMsg) + Send>,
) {
let mut last_yield = instant::Instant::now();

match re_log_types::encoding::Decoder::new(rrd_bytes.as_slice()) {
Ok(decoder) => {
for msg in decoder {
match msg {
Ok(msg) => {
on_msg(msg);
}
Err(err) => {
re_log::warn_once!("Failed to decode message: {err}");
}
}

if last_yield.elapsed() > instant::Duration::from_millis(10) {
// yield to the ui task
yield_().await;
last_yield = instant::Instant::now();
}
}
}
Err(err) => {
re_log::error!("Failed to decode .rrd: {err}");
}
}
}

// Yield to other tasks
async fn yield_() {
sleep_ms(1).await; // TODO(emilk): create a better async yield function
}

// Hack to get async sleep on wasm
async fn sleep_ms(millis: i32) {
let mut cb = |resolve: js_sys::Function, _reject: js_sys::Function| {
web_sys::window()
.unwrap()
.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, millis)
.expect("Failed to call set_timeout");
};
let p = js_sys::Promise::new(&mut cb);
wasm_bindgen_futures::JsFuture::from(p).await.unwrap();
}
}

#[cfg(target_arch = "wasm32")]
use web_decode::decode_rrd;
3 changes: 2 additions & 1 deletion crates/re_viewer/src/viewer_analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ impl ViewerAnalytics {
if let Some(data_source) = &log_db.data_source {
let data_source = match data_source {
re_smart_channel::Source::File { .. } => "file", // .rrd
re_smart_channel::Source::Sdk => "sdk", // show()
re_smart_channel::Source::RrdHttpStream { .. } => "http",
re_smart_channel::Source::Sdk => "sdk", // show()
re_smart_channel::Source::WsClient { .. } => "ws_client", // spawn()
re_smart_channel::Source::TcpServer { .. } => "tcp_server", // connect()
};
Expand Down
Loading

1 comment on commit 5e88054

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust Benchmark

Benchmark suite Current: 5e88054 Previous: 9ed0f27 Ratio
datastore/insert/batch/rects/insert 567836 ns/iter (± 2239) 553813 ns/iter (± 3033) 1.03
datastore/latest_at/batch/rects/query 1888 ns/iter (± 1) 1850 ns/iter (± 6) 1.02
datastore/latest_at/missing_components/primary 285 ns/iter (± 0) 285 ns/iter (± 0) 1
datastore/latest_at/missing_components/secondaries 440 ns/iter (± 0) 461 ns/iter (± 0) 0.95
datastore/range/batch/rects/query 151144 ns/iter (± 160) 151166 ns/iter (± 366) 1.00
mono_points_arrow/generate_message_bundles 49335728 ns/iter (± 1428753) 49108870 ns/iter (± 1325792) 1.00
mono_points_arrow/generate_messages 136735336 ns/iter (± 1332016) 138380970 ns/iter (± 1187960) 0.99
mono_points_arrow/encode_log_msg 166469447 ns/iter (± 967847) 167031071 ns/iter (± 690931) 1.00
mono_points_arrow/encode_total 356658292 ns/iter (± 1686409) 354720289 ns/iter (± 1394258) 1.01
mono_points_arrow/decode_log_msg 186013297 ns/iter (± 1902305) 185478434 ns/iter (± 911490) 1.00
mono_points_arrow/decode_message_bundles 71881796 ns/iter (± 961244) 72175400 ns/iter (± 1143226) 1.00
mono_points_arrow/decode_total 255405433 ns/iter (± 1840287) 253342661 ns/iter (± 1832284) 1.01
batch_points_arrow/generate_message_bundles 324802 ns/iter (± 805) 326789 ns/iter (± 451) 0.99
batch_points_arrow/generate_messages 6450 ns/iter (± 13) 6476 ns/iter (± 14) 1.00
batch_points_arrow/encode_log_msg 354281 ns/iter (± 1077) 354280 ns/iter (± 1925) 1.00
batch_points_arrow/encode_total 697885 ns/iter (± 2361) 703438 ns/iter (± 2154) 0.99
batch_points_arrow/decode_log_msg 347240 ns/iter (± 742) 351266 ns/iter (± 525) 0.99
batch_points_arrow/decode_message_bundles 2157 ns/iter (± 14) 2134 ns/iter (± 18) 1.01
batch_points_arrow/decode_total 350973 ns/iter (± 855) 356919 ns/iter (± 650) 0.98
arrow_mono_points/insert 6788313820 ns/iter (± 13970042) 6913118708 ns/iter (± 27594054) 0.98
arrow_mono_points/query 1775115 ns/iter (± 15207) 1828751 ns/iter (± 11896) 0.97
arrow_batch_points/insert 2718636 ns/iter (± 9815) 2720885 ns/iter (± 12386) 1.00
arrow_batch_points/query 17012 ns/iter (± 31) 16929 ns/iter (± 23) 1.00
arrow_batch_vecs/insert 42654 ns/iter (± 130) 42885 ns/iter (± 109) 0.99
arrow_batch_vecs/query 506183 ns/iter (± 1278) 506582 ns/iter (± 852) 1.00
tuid/Tuid::random 34 ns/iter (± 0) 34 ns/iter (± 0) 1

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.