Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

re_web_viewer_server no longer needs tokio, split out sync code path #6030

Merged
merged 2 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion crates/re_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ thiserror.workspace = true
re_data_source = { workspace = true, optional = true }
re_smart_channel = { workspace = true, optional = true }
re_ws_comms = { workspace = true, optional = true }
re_web_viewer_server = { workspace = true, optional = true }
re_web_viewer_server = { workspace = true, optional = true, features = [
"sync",
] }

anyhow = { workspace = true, optional = true }
webbrowser = { workspace = true, optional = true }
Expand Down
15 changes: 7 additions & 8 deletions crates/re_web_viewer_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ all-features = true


[features]
default = ["sync"]

## ONLY FOR CI!
##
## When set, the crate builds despite the `.wasm` being missing, but will panic at runtime.
Expand All @@ -42,6 +44,9 @@ __ci = []
## Enable telemetry using our analytics SDK.
analytics = ["dep:re_analytics"]

## Enable `WebViewerServerHandle` for use without an async runtime.
sync = ["pollster"]


[dependencies]
re_log = { workspace = true, features = ["setup"] }
Expand All @@ -50,14 +55,8 @@ document-features.workspace = true
futures-util.workspace = true
hyper = { workspace = true, features = ["full"] }
thiserror.workspace = true
tokio = { workspace = true, default-features = false, features = [
"macros",
"rt-multi-thread",
] }

# Only needed for main.rs:
clap = { workspace = true, features = ["derive"] }
webbrowser.workspace = true
futures-channel.workspace = true # Used for server shutdown signaling. Picked this one because it's already used by hyper.

# Optional dependencies:
re_analytics = { workspace = true, optional = true }
pollster = { workspace = true, optional = true }
69 changes: 13 additions & 56 deletions crates/re_web_viewer_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ use std::{
use futures_util::future;
use hyper::{server::conn::AddrIncoming, service::Service, Body, Request, Response};

#[cfg(feature = "sync")]
mod sync;
#[cfg(feature = "sync")]
pub use sync::WebViewerServerHandle;

pub const DEFAULT_WEB_VIEWER_SERVER_PORT: u16 = 9090;

#[cfg(not(feature = "__ci"))]
Expand All @@ -33,18 +38,20 @@ mod data {

/// Failure to host the web viewer.
#[derive(thiserror::Error, Debug)]
#[allow(clippy::enum_variant_names)]
pub enum WebViewerServerError {
#[error("Could not parse address: {0}")]
AddrParseFailed(#[from] std::net::AddrParseError),

#[error("Failed to bind to port {0} for HTTP: {1}")]
BindFailed(WebViewerServerPort, hyper::Error),

#[error("Failed to join web viewer server task: {0}")]
JoinError(#[from] tokio::task::JoinError),

#[error("Failed to serve web viewer: {0}")]
ServeFailed(hyper::Error),

#[cfg(feature = "sync")]
#[error("Failed to spawn web viewer thread: {0}")]
ThreadSpawnFailed(#[from] std::io::Error),
}

struct Svc {
Expand Down Expand Up @@ -229,11 +236,11 @@ impl WebViewerServer {

pub async fn serve_with_graceful_shutdown(
self,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
shutdown_rx: futures_channel::oneshot::Receiver<()>,
) -> Result<(), WebViewerServerError> {
self.server
.with_graceful_shutdown(async {
shutdown_rx.recv().await.ok();
shutdown_rx.await.ok();
})
.await
.map_err(WebViewerServerError::ServeFailed)?;
Expand All @@ -246,57 +253,7 @@ impl WebViewerServer {
}
}

/// Sync handle for the [`WebViewerServer`]
///
/// When dropped, the server will be shut down.
pub struct WebViewerServerHandle {
local_addr: std::net::SocketAddr,
shutdown_tx: tokio::sync::broadcast::Sender<()>,
}

impl Drop for WebViewerServerHandle {
fn drop(&mut self) {
re_log::info!("Shutting down web server on {}", self.server_url());
self.shutdown_tx.send(()).ok();
}
}

impl WebViewerServerHandle {
/// Create new [`WebViewerServer`] to host the Rerun Web Viewer on a specified port.
/// Returns a [`WebViewerServerHandle`] that will shutdown the server when dropped.
///
/// A port of 0 will let the OS choose a free port.
///
/// The caller needs to ensure that there is a `tokio` runtime running.
pub fn new(
bind_ip: &str,
requested_port: WebViewerServerPort,
) -> Result<Self, WebViewerServerError> {
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);

let web_server = WebViewerServer::new(bind_ip, requested_port)?;

let local_addr = web_server.server.local_addr();

tokio::spawn(async move { web_server.serve_with_graceful_shutdown(shutdown_rx).await });

let slf = Self {
local_addr,
shutdown_tx,
};

re_log::info!("Started web server on {}", slf.server_url());

Ok(slf)
}

/// Includes `http://` prefix
pub fn server_url(&self) -> String {
server_url(&self.local_addr)
}
}

fn server_url(local_addr: &SocketAddr) -> String {
pub(crate) fn server_url(local_addr: &SocketAddr) -> String {
if local_addr.ip().is_unspecified() {
// "0.0.0.0"
format!("http://localhost:{}", local_addr.port())
Expand Down
49 changes: 0 additions & 49 deletions crates/re_web_viewer_server/src/main.rs

This file was deleted.

70 changes: 70 additions & 0 deletions crates/re_web_viewer_server/src/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use crate::{server_url, WebViewerServer, WebViewerServerError, WebViewerServerPort};

/// Sync handle for the [`WebViewerServer`]
///
/// When dropped, the server will be shut down.
pub struct WebViewerServerHandle {
local_addr: std::net::SocketAddr,
shutdown: Option<(
futures_channel::oneshot::Sender<()>,
std::thread::JoinHandle<()>,
)>,
}

impl Drop for WebViewerServerHandle {
fn drop(&mut self) {
re_log::info!("Shutting down web server on {}", self.server_url());

if let Some((shutdown_tx, thread_handle)) = self.shutdown.take() {
if shutdown_tx.send(()).is_err() {
re_log::error!("Failed to send shutdown signal to web server thread.");
}
thread_handle.join().ok();
}
}
}

impl WebViewerServerHandle {
/// Create new [`WebViewerServer`] to host the Rerun Web Viewer on a specified port.
/// Returns a [`WebViewerServerHandle`] that will shutdown the server when dropped.
///
/// A port of 0 will let the OS choose a free port.
///
/// Internally spawns a thread to run the server.
/// If you instead want to use the server in your async runtime (e.g. [`tokio`](https://docs.rs/tokio/latest/tokio/) or [`smol`](https://docs.rs/smol/latest/smol/)), use [`WebViewerServer`].
pub fn new(
bind_ip: &str,
requested_port: WebViewerServerPort,
) -> Result<Self, WebViewerServerError> {
let (shutdown_tx, shutdown_rx) = futures_channel::oneshot::channel();

let web_server = WebViewerServer::new(bind_ip, requested_port)?;

let local_addr = web_server.server.local_addr();

let server_serve_future = async {
if let Err(err) = web_server.serve_with_graceful_shutdown(shutdown_rx).await {
re_log::error!("Web server failed: {err}");
}
};

let thread_handle = std::thread::Builder::new()
.name("WebViewerServerHandle".to_owned())
.spawn(move || pollster::block_on(server_serve_future))
.map_err(WebViewerServerError::ThreadSpawnFailed)?;

let slf = Self {
local_addr,
shutdown: Some((shutdown_tx, thread_handle)),
};

re_log::info!("Started web server on {}", slf.server_url());

Ok(slf)
}

/// Includes `http://` prefix
pub fn server_url(&self) -> String {
server_url(&self.local_addr)
}
}
2 changes: 1 addition & 1 deletion examples/rust/minimal_serve/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ publish = false

[dependencies]
rerun = { path = "../../../crates/rerun", features = ["web_viewer"] }
tokio = "1.0"
tokio = { version = "1.24", features = ["rt-multi-thread"] }
Loading