Skip to content

Commit

Permalink
Refactor the relationship between the servers and handles
Browse files Browse the repository at this point in the history
  • Loading branch information
jleibs committed Apr 14, 2023
1 parent 31237fe commit ee7f208
Show file tree
Hide file tree
Showing 16 changed files with 225 additions and 130 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion crates/re_viewer/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,10 @@ fn get_url(info: &eframe::IntegrationInfo) -> String {
url = param.clone();
}
if url.is_empty() {
re_ws_comms::default_server_url(&info.web_info.location.hostname)
re_ws_comms::server_url(
&info.web_info.location.hostname,
re_ws_comms::DEFAULT_WS_SERVER_PORT,
)
} else {
url
}
Expand Down
1 change: 1 addition & 0 deletions crates/re_web_viewer_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ ctrlc.workspace = true
document-features = "0.2"
futures-util = "0.3"
hyper = { version = "0.14", features = ["full"] }
thiserror.workspace = true
tokio = { workspace = true, default-features = false, features = [
"macros",
"rt-multi-thread",
Expand Down
73 changes: 66 additions & 7 deletions crates/re_web_viewer_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use std::task::{Context, Poll};
use futures_util::future;
use hyper::{server::conn::AddrIncoming, service::Service, Body, Request, Response};

pub const DEFAULT_WEB_VIEWER_PORT: u16 = 9090;

#[cfg(not(feature = "__ci"))]
mod data {
// If you add/remove/change the paths here, also update the include-list in `Cargo.toml`!
Expand All @@ -32,6 +34,18 @@ mod data {
pub const VIEWER_WASM_RELEASE: &[u8] = include_bytes!("../web_viewer/re_viewer_bg.wasm");
}

#[derive(thiserror::Error, Debug)]
pub enum WebViewerServerError {
#[error("Could not parse address: {0}")]
AddrParseFailed(#[from] std::net::AddrParseError),
#[error("failed to bind to port {0}: {1}")]
BindFailed(u16, hyper::Error),
#[error("failed to join web viewer server task: {0}")]
JoinError(#[from] tokio::task::JoinError),
#[error("failed to serve web viewer: {0}")]
ServeFailed(hyper::Error),
}

struct Svc {
// NOTE: Optional because it is possible to have the `analytics` feature flag enabled
// while at the same time opting-out of analytics at run-time.
Expand Down Expand Up @@ -149,27 +163,72 @@ impl<T> Service<T> for MakeSvc {

// ----------------------------------------------------------------------------

/// Hosts the Web Viewer Wasm+HTML
/// HTTP host for the Rerun Web Viewer application
/// This serves the HTTP+WASM+JS files that make up the web-viewer.
pub struct WebViewerServer {
server: hyper::Server<AddrIncoming, MakeSvc>,
}

impl WebViewerServer {
pub fn new(port: u16) -> Self {
let bind_addr = format!("0.0.0.0:{port}").parse().unwrap();
let server = hyper::Server::bind(&bind_addr).serve(MakeSvc);
Self { server }
pub fn new(port: u16) -> Result<Self, WebViewerServerError> {
let bind_addr = format!("0.0.0.0:{port}").parse()?;
let server = hyper::Server::try_bind(&bind_addr)
.map_err(|e| WebViewerServerError::BindFailed(port, e))?
.serve(MakeSvc);
Ok(Self { server })
}

pub async fn serve(
self,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> anyhow::Result<()> {
) -> Result<(), WebViewerServerError> {
self.server
.with_graceful_shutdown(async {
shutdown_rx.recv().await.ok();
})
.await?;
.await
.map_err(WebViewerServerError::ServeFailed)?;
Ok(())
}
}

/// Sync handle for the [`WebViewerServer`]
///
/// When dropped, the server will be shut down.
pub struct WebViewerServerHandle {
port: u16,
shutdown_tx: tokio::sync::broadcast::Sender<()>,
}

impl Drop for WebViewerServerHandle {
fn drop(&mut self) {
re_log::info!("Shutting down web server on port {}.", self.port);
self.shutdown_tx.send(()).ok();
}
}

impl WebViewerServerHandle {
/// Create new [`WebViewerServer`] to host the Rerun Web Viewer on a specified port.
///
/// A port of 0 will let the OS choose a free port.
///
/// The caller needs to ensure that there is a `tokio` runtime running.
pub fn new(requested_port: u16) -> Result<Self, WebViewerServerError> {
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);

let web_server = WebViewerServer::new(requested_port)?;

let port = web_server.server.local_addr().port();

tokio::spawn(async move { web_server.serve(shutdown_rx).await });

re_log::info!("Started web server on port {}.", port);

Ok(Self { port, shutdown_tx })
}

/// Get the port where the web assets are hosted
pub fn port(&self) -> u16 {
self.port
}
}
1 change: 1 addition & 0 deletions crates/re_web_viewer_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ async fn main() {
.expect("Error setting Ctrl-C handler");

re_web_viewer_server::WebViewerServer::new(port)
.expect("Could not create web server")
.serve(shutdown_rx)
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions crates/re_ws_comms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ futures-util = { version = "0.3", optional = true, default-features = false, fea
"std",
] }
parking_lot = { workspace = true, optional = true }
thiserror.workspace = true
tokio-tungstenite = { version = "0.17.1", optional = true }
tokio = { workspace = true, optional = true, default-features = false, features = [
"io-std",
Expand Down
6 changes: 3 additions & 3 deletions crates/re_ws_comms/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub use client::Connection;
#[cfg(feature = "server")]
mod server;
#[cfg(feature = "server")]
pub use server::Server;
pub use server::{RerunServer, RerunServerHandle};

use re_log_types::LogMsg;

Expand All @@ -26,8 +26,8 @@ pub const PROTOCOL: &str = "wss";
#[cfg(not(feature = "tls"))]
pub const PROTOCOL: &str = "ws";

pub fn default_server_url(hostname: &str) -> String {
format!("{PROTOCOL}://{hostname}:{DEFAULT_WS_SERVER_PORT}")
pub fn server_url(hostname: &str, port: u16) -> String {
format!("{PROTOCOL}://{hostname}:{port}")
}

const PREFIX: [u8; 4] = *b"RR00";
Expand Down
75 changes: 63 additions & 12 deletions crates/re_ws_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,32 @@ use tokio_tungstenite::{accept_async, tungstenite::Error};
use re_log_types::LogMsg;
use re_smart_channel::Receiver;

use crate::server_url;

// ----------------------------------------------------------------------------

pub struct Server {
#[derive(thiserror::Error, Debug)]
pub enum RerunServerError {
#[error("failed to bind to port {0}: {1}")]
BindFailed(u16, std::io::Error),
#[error("failed to join web viewer server task: {0}")]
JoinError(#[from] tokio::task::JoinError),
#[error("tokio error: {0}")]
TokioIoError(#[from] tokio::io::Error),
}

pub struct RerunServer {
listener: TcpListener,
}

impl Server {
impl RerunServer {
/// Start a pub-sub server listening on the given port
pub async fn new(port: u16) -> anyhow::Result<Self> {
use anyhow::Context as _;

pub async fn new(port: u16) -> Result<Self, RerunServerError> {
let bind_addr = format!("0.0.0.0:{port}");

let listener = TcpListener::bind(&bind_addr)
.await
.with_context(|| format!("Can't listen on {bind_addr:?}"))?;
.map_err(|e| RerunServerError::BindError(port, e))?;

re_log::info!(
"Listening for websocket traffic on {bind_addr}. Connect with a web Rerun Viewer."
Expand All @@ -45,9 +55,7 @@ impl Server {
self,
rx: Receiver<LogMsg>,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> anyhow::Result<()> {
use anyhow::Context as _;

) -> Result<(), RerunServerError> {
let history = Arc::new(Mutex::new(Vec::new()));

let log_stream = to_broadcast_stream(rx, history.clone());
Expand All @@ -60,9 +68,7 @@ impl Server {
}
};

let peer = tcp_stream
.peer_addr()
.context("connected streams should have a peer address")?;
let peer = tcp_stream.peer_addr()?;
tokio::spawn(accept_connection(
log_stream.clone(),
peer,
Expand All @@ -73,6 +79,51 @@ impl Server {
}
}

pub struct RerunServerHandle {
port: u16,
shutdown_tx: tokio::sync::broadcast::Sender<()>,
}

impl Drop for RerunServerHandle {
fn drop(&mut self) {
re_log::info!("Shutting down Rerun server on port {}.", self.port);
self.shutdown_tx.send(()).ok();
}
}

impl RerunServerHandle {
/// Create new [`RerunServer`] to relay [`LogMsg`]s to a web viewer.
///
/// A port of 0 will let the OS choose a free port.
///
/// The caller needs to ensure that there is a `tokio` runtime running.
pub fn new(rerun_rx: Receiver<LogMsg>, requested_port: u16) -> Result<Self, RerunServerError> {
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);

let rt = tokio::runtime::Handle::current();

let ws_server = rt.block_on(tokio::spawn(async move {
let ws_server = RerunServer::new(requested_port).await;
ws_server
}))??;

let port = ws_server.listener.local_addr()?.port();

tokio::spawn(async move { ws_server.listen(rerun_rx, shutdown_rx).await });

Ok(Self { port, shutdown_tx })
}

/// Get the port where the web assets are hosted
pub fn port(&self) -> u16 {
self.port
}

pub fn server_url(&self) -> String {
server_url("localhost", self.port)
}
}

fn to_broadcast_stream(
log_rx: Receiver<LogMsg>,
history: Arc<Mutex<Vec<Arc<[u8]>>>>,
Expand Down
2 changes: 1 addition & 1 deletion crates/rerun/src/clap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl RerunArgs {
#[cfg(feature = "web_viewer")]
RerunBehavior::Serve => {
let open_browser = true;
crate::web_viewer::new_sink(open_browser)
crate::web_viewer::new_sink(open_browser)?
}

#[cfg(feature = "native_viewer")]
Expand Down
27 changes: 20 additions & 7 deletions crates/rerun/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,15 @@ async fn run_impl(
if args.web_viewer {
#[cfg(feature = "web_viewer")]
{
let web_viewer =
host_web_viewer(true, rerun_server_ws_url, shutdown_rx.resubscribe());
let web_port = re_web_viewer_server::DEFAULT_WEB_VIEWER_PORT;
let web_viewer = host_web_viewer(
web_port,
true,
rerun_server_ws_url,
shutdown_rx.resubscribe(),
);
// We return here because the running [`WebViewerServer`] is all we need.
// The page we open will be pointed at a websocket url hosted by a *different* server.
return web_viewer.await;
}
#[cfg(not(feature = "web_viewer"))]
Expand Down Expand Up @@ -369,17 +376,23 @@ async fn run_impl(
let shutdown_web_viewer = shutdown_rx.resubscribe();

// This is the server which the web viewer will talk to:
let ws_server = re_ws_comms::Server::new(re_ws_comms::DEFAULT_WS_SERVER_PORT).await?;
let ws_server =
re_ws_comms::RerunServer::new(re_ws_comms::DEFAULT_WS_SERVER_PORT).await?;
let ws_server_handle = tokio::spawn(ws_server.listen(rx, shutdown_ws_server));
let ws_server_url = re_ws_comms::default_server_url("127.0.0.1");
let ws_server_url =
re_ws_comms::server_url("127.0.0.1", re_ws_comms::DEFAULT_WS_SERVER_PORT);

// This is the server that serves the Wasm+HTML:
let web_server_handle =
tokio::spawn(host_web_viewer(true, ws_server_url, shutdown_web_viewer));
let web_server_handle = tokio::spawn(host_web_viewer(
re_web_viewer_server::DEFAULT_WEB_VIEWER_PORT,
true,
ws_server_url,
shutdown_web_viewer,
));

// Wait for both servers to shutdown.
web_server_handle.await?.ok();
return ws_server_handle.await?;
return ws_server_handle.await?.map_err(anyhow::Error::from);
}

#[cfg(not(feature = "web_viewer"))]
Expand Down
Loading

0 comments on commit ee7f208

Please sign in to comment.