Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the relationship between the assorted web / websocket servers #1844

Merged
merged 15 commits into from
Apr 14, 2023
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/re_viewer/src/remote_viewer_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl RemoteViewerApp {
}
}
Err(err) => {
re_log::error!("Failed to parse message: {}", re_error::format(&err));
re_log::error!("Failed to parse message: {err}");
std::ops::ControlFlow::Break(())
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/re_viewer/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ fn get_url(info: &eframe::IntegrationInfo) -> String {
url = param.clone();
}
if url.is_empty() {
re_ws_comms::default_server_url(&info.web_info.location.hostname)
re_ws_comms::server_url(&info.web_info.location.hostname, Default::default())
} else {
url
}
Expand Down
2 changes: 1 addition & 1 deletion crates/re_web_viewer_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ analytics = ["dep:re_analytics"]
[dependencies]
re_log.workspace = true

anyhow.workspace = true
ctrlc.workspace = true
document-features = "0.2"
futures-util = "0.3"
hyper = { version = "0.14", features = ["full"] }
thiserror.workspace = true
tokio = { workspace = true, default-features = false, features = [
"macros",
"rt-multi-thread",
Expand Down
118 changes: 110 additions & 8 deletions crates/re_web_viewer_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@
#![forbid(unsafe_code)]
#![warn(clippy::all, rust_2018_idioms)]

use std::task::{Context, Poll};
use std::{
fmt::Display,
str::FromStr,
task::{Context, Poll},
};

use futures_util::future;
use hyper::{server::conn::AddrIncoming, service::Service, Body, Request, Response};

pub const DEFAULT_WEB_VIEWER_SERVER_PORT: u16 = 9090;

#[cfg(not(feature = "__ci"))]
mod data {
// If you add/remove/change the paths here, also update the include-list in `Cargo.toml`!
Expand All @@ -32,6 +38,21 @@ mod data {
pub const VIEWER_WASM_RELEASE: &[u8] = include_bytes!("../web_viewer/re_viewer_bg.wasm");
}

#[derive(thiserror::Error, Debug)]
pub enum WebViewerServerError {
#[error("Could not parse address: {0}")]
AddrParseFailed(#[from] std::net::AddrParseError),

#[error("failed to bind to port {0}: {1}")]
BindFailed(WebViewerServerPort, hyper::Error),

#[error("failed to join web viewer server task: {0}")]
JoinError(#[from] tokio::task::JoinError),

#[error("failed to serve web viewer: {0}")]
ServeFailed(hyper::Error),
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!


struct Svc {
// NOTE: Optional because it is possible to have the `analytics` feature flag enabled
// while at the same time opting-out of analytics at run-time.
Expand Down Expand Up @@ -149,27 +170,108 @@ impl<T> Service<T> for MakeSvc {

// ----------------------------------------------------------------------------

/// Hosts the Web Viewer Wasm+HTML
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
/// Typed port for use with [`WebViewerServer`]
pub struct WebViewerServerPort(pub u16);

impl Default for WebViewerServerPort {
fn default() -> Self {
Self(DEFAULT_WEB_VIEWER_SERVER_PORT)
}
}

impl Display for WebViewerServerPort {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

// Needed for clap
impl FromStr for WebViewerServerPort {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.parse::<u16>() {
Ok(port) => Ok(WebViewerServerPort(port)),
Err(err) => Err(format!("Failed to parse port: {err}")),
}
}
}

/// HTTP host for the Rerun Web Viewer application
/// This serves the HTTP+Wasm+JS files that make up the web-viewer.
pub struct WebViewerServer {
server: hyper::Server<AddrIncoming, MakeSvc>,
}

impl WebViewerServer {
pub fn new(port: u16) -> Self {
let bind_addr = format!("0.0.0.0:{port}").parse().unwrap();
let server = hyper::Server::bind(&bind_addr).serve(MakeSvc);
Self { server }
/// Create new [`WebViewerServer`] to host the Rerun Web Viewer on a specified port.
///
/// A port of 0 will let the OS choose a free port.
pub fn new(port: WebViewerServerPort) -> Result<Self, WebViewerServerError> {
let bind_addr = format!("0.0.0.0:{port}").parse()?;
let server = hyper::Server::try_bind(&bind_addr)
.map_err(|e| WebViewerServerError::BindFailed(port, e))?
.serve(MakeSvc);
Ok(Self { server })
}

pub async fn serve(
self,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> anyhow::Result<()> {
) -> Result<(), WebViewerServerError> {
self.server
.with_graceful_shutdown(async {
shutdown_rx.recv().await.ok();
})
.await?;
.await
.map_err(WebViewerServerError::ServeFailed)?;
Ok(())
}

pub fn port(&self) -> WebViewerServerPort {
WebViewerServerPort(self.server.local_addr().port())
}
}

/// Sync handle for the [`WebViewerServer`]
///
/// When dropped, the server will be shut down.
pub struct WebViewerServerHandle {
port: WebViewerServerPort,
shutdown_tx: tokio::sync::broadcast::Sender<()>,
}

impl Drop for WebViewerServerHandle {
fn drop(&mut self) {
re_log::info!("Shutting down web server on port {}.", self.port);
self.shutdown_tx.send(()).ok();
}
}

impl WebViewerServerHandle {
/// Create new [`WebViewerServer`] to host the Rerun Web Viewer on a specified port.
/// Returns a [`WebViewerServerHandle`] that will shutdown the server when dropped.
///
/// A port of 0 will let the OS choose a free port.
jleibs marked this conversation as resolved.
Show resolved Hide resolved
///
/// The caller needs to ensure that there is a `tokio` runtime running.
pub fn new(requested_port: WebViewerServerPort) -> Result<Self, WebViewerServerError> {
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);

let web_server = WebViewerServer::new(requested_port)?;

let port = web_server.port();

tokio::spawn(async move { web_server.serve(shutdown_rx).await });

re_log::info!("Started web server on port {}.", port);

Ok(Self { port, shutdown_tx })
}

/// Get the port where the HTTP server is listening
pub fn port(&self) -> WebViewerServerPort {
self.port
}
}
3 changes: 2 additions & 1 deletion crates/re_web_viewer_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#[tokio::main]
async fn main() {
re_log::setup_native_logging();
let port = 9090;
let port = Default::default();
eprintln!("Hosting web-viewer on http://127.0.0.1:{port}");

// Shutdown server via Ctrl+C
Expand All @@ -16,6 +16,7 @@ async fn main() {
.expect("Error setting Ctrl-C handler");

re_web_viewer_server::WebViewerServer::new(port)
.expect("Could not create web server")
.serve(shutdown_rx)
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions crates/re_ws_comms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ re_log_types = { workspace = true, features = ["serde"] }
anyhow.workspace = true
bincode = "1.3"
document-features = "0.2"
thiserror.workspace = true

# Client:
ewebsock = { version = "0.2", optional = true }
Expand Down
3 changes: 2 additions & 1 deletion crates/re_ws_comms/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::ops::ControlFlow;

use ewebsock::{WsEvent, WsMessage, WsSender};

use crate::Result;
// TODO(jleibs): use thiserror
pub type Result<T> = anyhow::Result<T>;

/// Represents a connection to the server.
/// Disconnects on drop.
Expand Down
69 changes: 58 additions & 11 deletions crates/re_ws_comms/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@

#[cfg(feature = "client")]
mod client;
use std::{fmt::Display, str::FromStr};

#[cfg(feature = "client")]
pub use client::Connection;

#[cfg(feature = "server")]
mod server;
#[cfg(feature = "server")]
pub use server::Server;
pub use server::{RerunServer, RerunServerHandle};

use re_log_types::LogMsg;

pub type Result<T> = anyhow::Result<T>;

pub const DEFAULT_WS_SERVER_PORT: u16 = 9877;

#[cfg(feature = "tls")]
Expand All @@ -26,8 +26,58 @@ pub const PROTOCOL: &str = "wss";
#[cfg(not(feature = "tls"))]
pub const PROTOCOL: &str = "ws";

pub fn default_server_url(hostname: &str) -> String {
format!("{PROTOCOL}://{hostname}:{DEFAULT_WS_SERVER_PORT}")
// ----------------------------------------------------------------------------

#[derive(thiserror::Error, Debug)]
pub enum RerunServerError {
#[error("failed to bind to port {0}: {1}")]
BindFailed(RerunServerPort, std::io::Error),

#[error("received an invalid message")]
InvalidMessagePrefix,

#[error("received an invalid message")]
InvalidMessage(#[from] bincode::Error),

#[cfg(feature = "server")]
#[error("failed to join web viewer server task: {0}")]
JoinError(#[from] tokio::task::JoinError),

#[cfg(feature = "server")]
#[error("tokio error: {0}")]
TokioIoError(#[from] tokio::io::Error),
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
/// Typed port for use with [`RerunServer`]
pub struct RerunServerPort(pub u16);

impl Default for RerunServerPort {
fn default() -> Self {
Self(DEFAULT_WS_SERVER_PORT)
}
}

impl Display for RerunServerPort {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

// Needed for clap
impl FromStr for RerunServerPort {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.parse::<u16>() {
Ok(port) => Ok(RerunServerPort(port)),
Err(err) => Err(format!("Failed to parse port: {err}")),
}
}
}

pub fn server_url(hostname: &str, port: RerunServerPort) -> String {
format!("{PROTOCOL}://{hostname}:{port}")
}

const PREFIX: [u8; 4] = *b"RR00";
Expand All @@ -41,14 +91,11 @@ pub fn encode_log_msg(log_msg: &LogMsg) -> Vec<u8> {
bytes
}

pub fn decode_log_msg(data: &[u8]) -> Result<LogMsg> {
pub fn decode_log_msg(data: &[u8]) -> Result<LogMsg, RerunServerError> {
let payload = data
.strip_prefix(&PREFIX)
.ok_or_else(|| anyhow::format_err!("Message didn't start with the correct prefix"))?;
.ok_or(RerunServerError::InvalidMessagePrefix)?;

use anyhow::Context as _;
use bincode::Options as _;
bincode::DefaultOptions::new()
.deserialize(payload)
.context("bincode")
Ok(bincode::DefaultOptions::new().deserialize(payload)?)
}
Loading