Skip to content

Commit

Permalink
re_web_viewer_server no longer needs tokio, split out sync code path
Browse files Browse the repository at this point in the history
  • Loading branch information
Wumpf committed Apr 18, 2024
1 parent 49da25d commit d97ebe2
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 117 deletions.
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion crates/re_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ thiserror.workspace = true
re_data_source = { workspace = true, optional = true }
re_smart_channel = { workspace = true, optional = true }
re_ws_comms = { workspace = true, optional = true }
re_web_viewer_server = { workspace = true, optional = true }
re_web_viewer_server = { workspace = true, optional = true, features = [
"sync",
] }

anyhow = { workspace = true, optional = true }
webbrowser = { workspace = true, optional = true }
Expand Down
15 changes: 7 additions & 8 deletions crates/re_web_viewer_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ all-features = true


[features]
default = ["sync"]

## ONLY FOR CI!
##
## When set, the crate builds despite the `.wasm` being missing, but will panic at runtime.
Expand All @@ -42,6 +44,9 @@ __ci = []
## Enable telemetry using our analytics SDK.
analytics = ["dep:re_analytics"]

## Enable `WebViewerServerHandle` for use without an async runtime.
sync = ["pollster"]


[dependencies]
re_log = { workspace = true, features = ["setup"] }
Expand All @@ -50,14 +55,8 @@ document-features.workspace = true
futures-util.workspace = true
hyper = { workspace = true, features = ["full"] }
thiserror.workspace = true
tokio = { workspace = true, default-features = false, features = [
"macros",
"rt-multi-thread",
] }

# Only needed for main.rs:
clap = { workspace = true, features = ["derive"] }
webbrowser.workspace = true
futures-channel.workspace = true # Used for server shutdown signaling. Picked this one because it's already used by hyper.

# Optional dependencies:
re_analytics = { workspace = true, optional = true }
pollster = { workspace = true, optional = true }
69 changes: 13 additions & 56 deletions crates/re_web_viewer_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ use std::{
use futures_util::future;
use hyper::{server::conn::AddrIncoming, service::Service, Body, Request, Response};

#[cfg(feature = "sync")]
mod sync;
#[cfg(feature = "sync")]
pub use sync::WebViewerServerHandle;

pub const DEFAULT_WEB_VIEWER_SERVER_PORT: u16 = 9090;

#[cfg(not(feature = "__ci"))]
Expand All @@ -33,18 +38,20 @@ mod data {

/// Failure to host the web viewer.
#[derive(thiserror::Error, Debug)]
#[allow(clippy::enum_variant_names)]
pub enum WebViewerServerError {
#[error("Could not parse address: {0}")]
AddrParseFailed(#[from] std::net::AddrParseError),

#[error("Failed to bind to port {0} for HTTP: {1}")]
BindFailed(WebViewerServerPort, hyper::Error),

#[error("Failed to join web viewer server task: {0}")]
JoinError(#[from] tokio::task::JoinError),

#[error("Failed to serve web viewer: {0}")]
ServeFailed(hyper::Error),

#[cfg(feature = "sync")]
#[error("Failed to spawn web viewer thread: {0}")]
ThreadSpawnFailed(#[from] std::io::Error),
}

struct Svc {
Expand Down Expand Up @@ -229,11 +236,11 @@ impl WebViewerServer {

pub async fn serve_with_graceful_shutdown(
self,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
shutdown_rx: futures_channel::oneshot::Receiver<()>,
) -> Result<(), WebViewerServerError> {
self.server
.with_graceful_shutdown(async {
shutdown_rx.recv().await.ok();
shutdown_rx.await.ok();
})
.await
.map_err(WebViewerServerError::ServeFailed)?;
Expand All @@ -246,57 +253,7 @@ impl WebViewerServer {
}
}

/// Sync handle for the [`WebViewerServer`]
///
/// When dropped, the server will be shut down.
pub struct WebViewerServerHandle {
local_addr: std::net::SocketAddr,
shutdown_tx: tokio::sync::broadcast::Sender<()>,
}

impl Drop for WebViewerServerHandle {
fn drop(&mut self) {
re_log::info!("Shutting down web server on {}", self.server_url());
self.shutdown_tx.send(()).ok();
}
}

impl WebViewerServerHandle {
/// Create new [`WebViewerServer`] to host the Rerun Web Viewer on a specified port.
/// Returns a [`WebViewerServerHandle`] that will shutdown the server when dropped.
///
/// A port of 0 will let the OS choose a free port.
///
/// The caller needs to ensure that there is a `tokio` runtime running.
pub fn new(
bind_ip: &str,
requested_port: WebViewerServerPort,
) -> Result<Self, WebViewerServerError> {
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);

let web_server = WebViewerServer::new(bind_ip, requested_port)?;

let local_addr = web_server.server.local_addr();

tokio::spawn(async move { web_server.serve_with_graceful_shutdown(shutdown_rx).await });

let slf = Self {
local_addr,
shutdown_tx,
};

re_log::info!("Started web server on {}", slf.server_url());

Ok(slf)
}

/// Includes `http://` prefix
pub fn server_url(&self) -> String {
server_url(&self.local_addr)
}
}

fn server_url(local_addr: &SocketAddr) -> String {
pub(crate) fn server_url(local_addr: &SocketAddr) -> String {
if local_addr.ip().is_unspecified() {
// "0.0.0.0"
format!("http://localhost:{}", local_addr.port())
Expand Down
49 changes: 0 additions & 49 deletions crates/re_web_viewer_server/src/main.rs

This file was deleted.

70 changes: 70 additions & 0 deletions crates/re_web_viewer_server/src/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use crate::{server_url, WebViewerServer, WebViewerServerError, WebViewerServerPort};

/// Sync handle for the [`WebViewerServer`]
///
/// When dropped, the server will be shut down.
pub struct WebViewerServerHandle {
local_addr: std::net::SocketAddr,
shutdown: Option<(
futures_channel::oneshot::Sender<()>,
std::thread::JoinHandle<()>,
)>,
}

impl Drop for WebViewerServerHandle {
fn drop(&mut self) {
re_log::info!("Shutting down web server on {}", self.server_url());

if let Some((shutdown_tx, thread_handle)) = self.shutdown.take() {
if shutdown_tx.send(()).is_err() {
re_log::error!("Failed to send shutdown signal to web server thread.");
}
thread_handle.join().ok();
}
}
}

impl WebViewerServerHandle {
/// Create new [`WebViewerServer`] to host the Rerun Web Viewer on a specified port.
/// Returns a [`WebViewerServerHandle`] that will shutdown the server when dropped.
///
/// A port of 0 will let the OS choose a free port.
///
/// Internally spawns a thread to run the server.
/// If you instead want to use the server in your async runtime (e.g. [`tokio`](https://docs.rs/tokio/latest/tokio/) or [`smol`](https://docs.rs/smol/latest/smol/)), use [`WebViewerServer`].
pub fn new(
bind_ip: &str,
requested_port: WebViewerServerPort,
) -> Result<Self, WebViewerServerError> {
let (shutdown_tx, shutdown_rx) = futures_channel::oneshot::channel();

let web_server = WebViewerServer::new(bind_ip, requested_port)?;

let local_addr = web_server.server.local_addr();

let server_serve_future = async {
if let Err(err) = web_server.serve_with_graceful_shutdown(shutdown_rx).await {
re_log::error!("Web server failed: {err}");
}
};

let thread_handle = std::thread::Builder::new()
.name("WebViewerServerHandle".to_owned())
.spawn(move || pollster::block_on(server_serve_future))
.map_err(WebViewerServerError::ThreadSpawnFailed)?;

let slf = Self {
local_addr,
shutdown: Some((shutdown_tx, thread_handle)),
};

re_log::info!("Started web server on {}", slf.server_url());

Ok(slf)
}

/// Includes `http://` prefix
pub fn server_url(&self) -> String {
server_url(&self.local_addr)
}
}

0 comments on commit d97ebe2

Please sign in to comment.