Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure all log messages are sent when using .serve() #6335

Merged
merged 14 commits into from
May 15, 2024
65 changes: 46 additions & 19 deletions crates/re_sdk/src/web_viewer.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use re_log_types::LogMsg;
use re_web_viewer_server::{WebViewerServer, WebViewerServerPort};
use re_web_viewer_server::{WebViewerServer, WebViewerServerError, WebViewerServerPort};
use re_ws_comms::{RerunServer, RerunServerPort};

// ----------------------------------------------------------------------------

/// Failure to host a web viewer and/or Rerun server.
#[derive(thiserror::Error, Debug)]
pub enum WebViewerSinkError {
/// Failure to host the web viewer.
#[error(transparent)]
WebViewerServer(#[from] re_web_viewer_server::WebViewerServerError),
WebViewerServer(#[from] WebViewerServerError),

/// Failure to host the Rerun WebSocket server.
#[error(transparent)]
Expand All @@ -16,13 +18,15 @@ pub enum WebViewerSinkError {

/// A [`crate::sink::LogSink`] tied to a hosted Rerun web viewer. This internally stores two servers:
/// * A [`re_ws_comms::RerunServer`] to relay messages from the sink to a websocket connection
/// * A [`re_web_viewer_server::WebViewerServer`] to serve the Wasm+HTML
/// * A [`WebViewerServer`] to serve the Wasm+HTML
struct WebViewerSink {
open_browser: bool,

/// Sender to send messages to the [`re_ws_comms::RerunServer`]
sender: re_smart_channel::Sender<LogMsg>,

/// Rerun websocket server.
_rerun_server: RerunServer,
rerun_server: RerunServer,

/// The http server serving wasm & html.
_webviewer_server: WebViewerServer,
Expand Down Expand Up @@ -61,14 +65,48 @@ impl WebViewerSink {
}

Ok(Self {
open_browser,
sender: rerun_tx,
_rerun_server: rerun_server,
rerun_server,
_webviewer_server: webviewer_server,
})
}
}

/// Helper to spawn an instance of the [`re_web_viewer_server::WebViewerServer`].
impl crate::sink::LogSink for WebViewerSink {
fn send(&self, msg: LogMsg) {
if let Err(err) = self.sender.send(msg) {
re_log::error_once!("Failed to send log message to web server: {err}");
}
}

#[inline]
fn flush_blocking(&self) {
if let Err(err) = self.sender.flush_blocking() {
re_log::error_once!("Failed to flush: {err}");
}
}
}

impl Drop for WebViewerSink {
fn drop(&mut self) {
if self.open_browser && self.rerun_server.num_accepted_clients() == 0 {
// For small scripts that execute fast we run the risk of finishing
// before the browser has a chance to connect.
// Let's give it a little more time:
re_log::info!("Sleeping a short while to give the browser time to connect…");
std::thread::sleep(std::time::Duration::from_millis(1000));
emilk marked this conversation as resolved.
Show resolved Hide resolved
}

if self.rerun_server.num_accepted_clients() == 0 {
re_log::info!("Shutting down without any clients ever having connected. Consider sleeping to give them more time to connect");
}
}
}

// ----------------------------------------------------------------------------

/// Helper to spawn an instance of the [`WebViewerServer`].
/// This serves the HTTP+Wasm+JS files that make up the web-viewer.
///
/// Optionally opens a browser with the web-viewer and connects to the provided `target_url`.
Expand All @@ -82,8 +120,8 @@ pub fn host_web_viewer(
force_wgpu_backend: Option<String>,
open_browser: bool,
source_url: &str,
) -> anyhow::Result<re_web_viewer_server::WebViewerServer> {
let web_server = re_web_viewer_server::WebViewerServer::new(bind_ip, web_port)?;
) -> anyhow::Result<WebViewerServer> {
let web_server = WebViewerServer::new(bind_ip, web_port)?;
let http_web_viewer_url = web_server.server_url();

let mut viewer_url = format!("{http_web_viewer_url}?url={source_url}");
Expand All @@ -99,17 +137,6 @@ pub fn host_web_viewer(
Ok(web_server)
}

impl crate::sink::LogSink for WebViewerSink {
fn send(&self, msg: LogMsg) {
if let Err(err) = self.sender.send(msg) {
re_log::error_once!("Failed to send log message to web server: {err}");
}
}

#[inline]
fn flush_blocking(&self) {}
}

// ----------------------------------------------------------------------------

/// Serve log-data over WebSockets and serve a Rerun web viewer over HTTP.
Expand Down
26 changes: 19 additions & 7 deletions crates/re_smart_channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,17 +195,31 @@ pub(crate) fn smart_channel_with_stats<T: Send>(
/// The payload of a [`SmartMessage`].
///
/// Either data or an end-of-stream marker.
#[derive(Debug)]
pub enum SmartMessagePayload<T: Send> {
/// A message sent down the channel.
Msg(T),

/// When received, flush anything already received and then call the given callback.
Flush {
on_flush_done: Box<dyn FnOnce() + Send>,
},

/// The [`Sender`] has quit.
///
/// `None` indicates the sender left gracefully, an error indicates otherwise.
Quit(Option<Box<dyn std::error::Error + Send>>),
}

impl<T: Send> std::fmt::Debug for SmartMessagePayload<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SmartMessagePayload::Msg(_) => f.write_str("Msg(_)"),
SmartMessagePayload::Flush { .. } => f.write_str("Flush"),
SmartMessagePayload::Quit(_) => f.write_str("Quit"),
}
}
}

impl<T: Send + PartialEq> PartialEq for SmartMessagePayload<T> {
fn eq(&self, rhs: &Self) -> bool {
match (self, rhs) {
Expand All @@ -224,18 +238,16 @@ pub struct SmartMessage<T: Send> {

impl<T: Send> SmartMessage<T> {
pub fn data(&self) -> Option<&T> {
use SmartMessagePayload::{Msg, Quit};
match &self.payload {
Msg(msg) => Some(msg),
Quit(_) => None,
SmartMessagePayload::Msg(msg) => Some(msg),
SmartMessagePayload::Flush { .. } | SmartMessagePayload::Quit(_) => None,
}
}

pub fn into_data(self) -> Option<T> {
use SmartMessagePayload::{Msg, Quit};
match self.payload {
Msg(msg) => Some(msg),
Quit(_) => None,
SmartMessagePayload::Msg(msg) => Some(msg),
SmartMessagePayload::Flush { .. } | SmartMessagePayload::Quit(_) => None,
}
}
}
Expand Down
21 changes: 20 additions & 1 deletion crates/re_smart_channel/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl<T: Send> Sender<T> {
)
.map_err(|SendError(msg)| match msg {
SmartMessagePayload::Msg(msg) => SendError(msg),
SmartMessagePayload::Quit(_) => unreachable!(),
SmartMessagePayload::Flush { .. } | SmartMessagePayload::Quit(_) => unreachable!(),
})
}

Expand All @@ -60,6 +60,25 @@ impl<T: Send> Sender<T> {
.map_err(|SendError(msg)| SendError(msg.payload))
}

/// Blocks until all previously sent messages have been received.
pub fn flush_blocking(&self) -> Result<(), SendError<()>> {
let (tx, rx) = std::sync::mpsc::sync_channel(0); // oneshot
self.tx
.send(SmartMessage {
time: Instant::now(),
source: Arc::clone(&self.source),
payload: SmartMessagePayload::Flush {
on_flush_done: Box::new(move || {
tx.send(()).ok();
}),
},
})
.map_err(|_ignored| SendError(()))?;

// Block:
rx.recv().map_err(|_ignored| SendError(()))
}

/// Used to indicate that a sender has left.
///
/// This sends a message down the channel allowing the receiving end to know whether one of the
Expand Down
6 changes: 6 additions & 0 deletions crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,12 @@ impl App {
while let Some((channel_source, msg)) = self.rx.try_recv() {
let msg = match msg.payload {
re_smart_channel::SmartMessagePayload::Msg(msg) => msg,

re_smart_channel::SmartMessagePayload::Flush { on_flush_done } => {
on_flush_done();
continue;
}

re_smart_channel::SmartMessagePayload::Quit(err) => {
if let Some(err) = err {
let log_msg =
Expand Down
23 changes: 16 additions & 7 deletions crates/re_web_viewer_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
use std::{
fmt::Display,
str::FromStr,
sync::{atomic::AtomicBool, Arc},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
};

pub const DEFAULT_WEB_VIEWER_SERVER_PORT: u16 = 9090;
Expand Down Expand Up @@ -86,6 +89,7 @@ pub struct WebViewerServer {
struct WebViewerServerInner {
server: tiny_http::Server,
shutdown: AtomicBool,
num_wasm_served: AtomicU64,

// NOTE: Optional because it is possible to have the `analytics` feature flag enabled
// while at the same time opting-out of analytics at run-time.
Expand Down Expand Up @@ -119,6 +123,7 @@ impl WebViewerServer {
let inner = Arc::new(WebViewerServerInner {
server,
shutdown,
num_wasm_served: Default::default(),

#[cfg(feature = "analytics")]
analytics: match re_analytics::Analytics::new(std::time::Duration::from_secs(2)) {
Expand Down Expand Up @@ -169,9 +174,12 @@ impl WebViewerServer {
impl Drop for WebViewerServer {
fn drop(&mut self) {
if let Some(thread_handle) = self.thread_handle.take() {
self.inner
.shutdown
.store(true, std::sync::atomic::Ordering::Release);
let num_wasm_served = self.inner.num_wasm_served.load(Ordering::Relaxed);
re_log::debug!(
"Shutting down web server after serving the Wasm {num_wasm_served} time(s)"
);

self.inner.shutdown.store(true, Ordering::Release);
self.inner.server.unblock();
thread_handle.join().ok();
}
Expand All @@ -182,7 +190,7 @@ impl WebViewerServerInner {
fn serve(&self) {
loop {
let request = self.server.recv();
if self.shutdown.load(std::sync::atomic::Ordering::Acquire) {
if self.shutdown.load(Ordering::Acquire) {
return;
}

Expand All @@ -200,8 +208,10 @@ impl WebViewerServerInner {
}
}

#[cfg(feature = "analytics")]
fn on_serve_wasm(&self) {
self.num_wasm_served.fetch_add(1, Ordering::Relaxed);

#[cfg(feature = "analytics")]
if let Some(analytics) = &self.analytics {
analytics.record(re_analytics::event::ServeWasm);
}
Expand Down Expand Up @@ -229,7 +239,6 @@ impl WebViewerServerInner {
"/sw.js" => ("text/javascript", data::SW_JS),
"/re_viewer.js" => ("text/javascript", data::VIEWER_JS),
"/re_viewer_bg.wasm" => {
#[cfg(feature = "analytics")]
self.on_serve_wasm();
("application/wasm", data::VIEWER_WASM)
}
Expand Down
Loading
Loading