Skip to content

Commit

Permalink
Fix crash on serve exit, second attempt (#1633)
Browse files Browse the repository at this point in the history
* Revert "Revert "Handle ctrl+c to gracefully shutdown the server(s) (#1613)" (#1632)"

This reverts commit 93ab88b.

* pass shutdown bool to app instead of installing another ctrl+c handler
  • Loading branch information
Wumpf authored Mar 21, 2023
1 parent 068428b commit 7c36951
Show file tree
Hide file tree
Showing 20 changed files with 233 additions and 151 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ arrow2 = "0.16"
arrow2_convert = "0.4.2"
clap = "4.0"
comfy-table = { version = "6.1", default-features = false }
ctrlc = { version = "3.0", features = ["termination"] }
ecolor = "0.21.0"
eframe = { version = "0.21.3", default-features = false }
egui = "0.21.0"
Expand Down
1 change: 1 addition & 0 deletions crates/re_sdk_comms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ bincode = "1.3"
crossbeam = "0.8"
document-features = "0.2"
rand = { version = "0.8.5", features = ["small_rng"] }
tokio.workspace = true
123 changes: 67 additions & 56 deletions crates/re_sdk_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use rand::{Rng as _, SeedableRng};

use re_log_types::{LogMsg, TimePoint, TimeType, TimelineName};
use re_smart_channel::{Receiver, Sender};
use tokio::net::{TcpListener, TcpStream};

#[derive(Clone, Copy, Debug, PartialEq)]
pub struct ServerOptions {
Expand All @@ -27,37 +28,18 @@ impl Default for ServerOptions {
}
}

/// Listen to multiple SDK:s connecting to us over TCP.
///
/// ``` no_run
/// # use re_sdk_comms::{serve, ServerOptions};
/// let log_msg_rx = serve(80, ServerOptions::default())?;
/// # Ok::<(), anyhow::Error>(())
/// ```
pub fn serve(port: u16, options: ServerOptions) -> anyhow::Result<Receiver<LogMsg>> {
async fn listen_for_new_clients(
port: u16,
options: ServerOptions,
tx: Sender<LogMsg>,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) {
let bind_addr = format!("0.0.0.0:{port}");

let listener = std::net::TcpListener::bind(&bind_addr)
.with_context(|| format!("Failed to bind TCP address {bind_addr:?} for our WS server."))?;

let (tx, rx) = re_smart_channel::smart_channel(re_smart_channel::Source::TcpServer { port });

std::thread::Builder::new()
.name("sdk-server".into())
.spawn(move || {
for stream in listener.incoming() {
match stream {
Ok(stream) => {
let tx = tx.clone();
spawn_client(stream, tx, options);
}
Err(err) => {
re_log::warn!("Failed to accept incoming SDK client: {err}");
}
}
}
})
.expect("Failed to spawn thread");
let listener = TcpListener::bind(&bind_addr)
.await
.with_context(|| format!("Failed to bind TCP address {bind_addr:?}"))
.unwrap();

if options.quiet {
re_log::debug!(
Expand All @@ -69,43 +51,72 @@ pub fn serve(port: u16, options: ServerOptions) -> anyhow::Result<Receiver<LogMs
);
}

Ok(rx)
loop {
let incoming = tokio::select! {
res = listener.accept() => res,
_ = shutdown_rx.recv() => {
return;
}
};
match incoming {
Ok((stream, _)) => {
let tx = tx.clone();
spawn_client(stream, tx, options);
}
Err(err) => {
re_log::warn!("Failed to accept incoming SDK client: {err}");
}
}
}
}

fn spawn_client(stream: std::net::TcpStream, tx: Sender<LogMsg>, options: ServerOptions) {
std::thread::Builder::new()
.name(format!(
"sdk-server-client-handler-{:?}",
stream.peer_addr()
))
.spawn(move || {
let addr_string = stream
.peer_addr()
.map_or_else(|_| "(unknown ip)".to_owned(), |addr| addr.to_string());
if options.quiet {
re_log::debug!("New SDK client connected: {addr_string}");
} else {
re_log::info!("New SDK client connected: {addr_string}");
}
/// Listen to multiple SDK:s connecting to us over TCP.
///
/// ``` no_run
/// # use re_sdk_comms::{serve, ServerOptions};
/// let (sender, receiver) = tokio::sync::broadcast::channel(1);
/// let log_msg_rx = serve(80, ServerOptions::default(), receiver)?;
/// # Ok::<(), anyhow::Error>(())
/// ```
pub fn serve(
port: u16,
options: ServerOptions,
shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> anyhow::Result<Receiver<LogMsg>> {
let (tx, rx) = re_smart_channel::smart_channel(re_smart_channel::Source::TcpServer { port });

if let Err(err) = run_client(stream, &tx, options) {
re_log::warn!("Closing connection to client: {err}");
}
})
.expect("Failed to spawn thread");
tokio::spawn(listen_for_new_clients(port, options, tx, shutdown_rx));

Ok(rx)
}

fn spawn_client(stream: TcpStream, tx: Sender<LogMsg>, options: ServerOptions) {
tokio::spawn(async move {
let addr_string = stream
.peer_addr()
.map_or_else(|_| "(unknown ip)".to_owned(), |addr| addr.to_string());
if options.quiet {
re_log::debug!("New SDK client connected: {addr_string}");
} else {
re_log::info!("New SDK client connected: {addr_string}");
}
if let Err(err) = run_client(stream, &tx, options).await {
re_log::warn!("Closing connection to client: {err}");
}
});
}

fn run_client(
mut stream: std::net::TcpStream,
async fn run_client(
mut stream: TcpStream,
tx: &Sender<LogMsg>,
options: ServerOptions,
) -> anyhow::Result<()> {
#![allow(clippy::read_zero_byte_vec)] // false positive: https://github.com/rust-lang/rust-clippy/issues/9274

use std::io::Read as _;
use tokio::io::AsyncReadExt as _;

let mut client_version = [0_u8; 2];
stream.read_exact(&mut client_version)?;
stream.read_exact(&mut client_version).await?;
let client_version = u16::from_le_bytes(client_version);

match client_version.cmp(&crate::PROTOCOL_VERSION) {
Expand All @@ -132,11 +143,11 @@ fn run_client(

loop {
let mut packet_size = [0_u8; 4];
stream.read_exact(&mut packet_size)?;
stream.read_exact(&mut packet_size).await?;
let packet_size = u32::from_le_bytes(packet_size);

packet.resize(packet_size as usize, 0_u8);
stream.read_exact(&mut packet)?;
stream.read_exact(&mut packet).await?;

re_log::trace!("Received log message of size {packet_size}.");

Expand Down
1 change: 0 additions & 1 deletion crates/re_viewer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ wgpu.workspace = true
arboard = { version = "3.2", default-features = false, features = [
"image-data",
] }
ctrlc = { version = "3.0", features = ["termination"] }
puffin_http = "0.11"
puffin.workspace = true

Expand Down
29 changes: 5 additions & 24 deletions crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ pub struct App {
state: AppState,

/// Set to `true` on Ctrl-C.
#[cfg(not(target_arch = "wasm32"))]
ctrl_c: std::sync::Arc<std::sync::atomic::AtomicBool>,
shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,

/// Pending background tasks, using `poll_promise`.
pending_promises: HashMap<String, Promise<Box<dyn Any + Send>>>,
Expand Down Expand Up @@ -109,28 +108,11 @@ impl App {
re_ui: re_ui::ReUi,
storage: Option<&dyn eframe::Storage>,
rx: Receiver<LogMsg>,
shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,
) -> Self {
#[cfg(not(target_arch = "wasm32"))]
let ctrl_c = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));

let (logger, text_log_rx) = re_log::ChannelLogger::new(re_log::LevelFilter::Info);
re_log::add_boxed_logger(Box::new(logger));

#[cfg(not(target_arch = "wasm32"))]
{
// Close viewer on Ctrl-C. TODO(emilk): maybe add to `eframe`?

let ctrl_c = ctrl_c.clone();
let egui_ctx = re_ui.egui_ctx.clone();

ctrlc::set_handler(move || {
re_log::debug!("Ctrl-C detected - Closing viewer.");
ctrl_c.store(true, std::sync::atomic::Ordering::SeqCst);
egui_ctx.request_repaint(); // so that we notice that we should close
})
.expect("Error setting Ctrl-C handler");
}

let state: AppState = storage
.and_then(|storage| eframe::get_value(storage, eframe::APP_KEY))
.unwrap_or_default();
Expand All @@ -147,8 +129,7 @@ impl App {
rx,
log_dbs: Default::default(),
state,
#[cfg(not(target_arch = "wasm32"))]
ctrl_c,
shutdown,
pending_promises: Default::default(),
toasts: toasts::Toasts::new(),
latest_memory_purge: instant::Instant::now(), // TODO(emilk): `Instant::MIN` when we have our own `Instant` that supports it.
Expand Down Expand Up @@ -436,8 +417,8 @@ impl eframe::App for App {
self.icon_status = setup_app_icon();
}

#[cfg(not(target_arch = "wasm32"))]
if self.ctrl_c.load(std::sync::atomic::Ordering::Relaxed) {
if self.shutdown.load(std::sync::atomic::Ordering::Relaxed) {
#[cfg(not(target_arch = "wasm32"))]
frame.close();
return;
}
Expand Down
1 change: 1 addition & 0 deletions crates/re_viewer/src/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub fn run_native_viewer_with_messages(
re_ui,
cc.storage,
rx,
std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
))
}))
}
1 change: 1 addition & 0 deletions crates/re_viewer/src/remote_viewer_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ impl RemoteViewerApp {
self.re_ui.clone(),
storage,
rx,
std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
);

self.app = Some((connection, app));
Expand Down
1 change: 1 addition & 0 deletions crates/re_viewer/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub async fn start(
re_ui,
cc.storage,
rx,
std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
))
}
EndpointCategory::WebSocket(url) => {
Expand Down
1 change: 1 addition & 0 deletions crates/re_web_viewer_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ analytics = ["dep:re_analytics"]
re_log.workspace = true

anyhow.workspace = true
ctrlc.workspace = true
document-features = "0.2"
futures-util = "0.3"
hyper = { version = "0.14", features = ["full"] }
Expand Down
11 changes: 9 additions & 2 deletions crates/re_web_viewer_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,15 @@ impl WebViewerServer {
Self { server }
}

pub async fn serve(self) -> anyhow::Result<()> {
self.server.await?;
pub async fn serve(
self,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> anyhow::Result<()> {
self.server
.with_graceful_shutdown(async {
shutdown_rx.recv().await.ok();
})
.await?;
Ok(())
}
}
11 changes: 10 additions & 1 deletion crates/re_web_viewer_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,17 @@ async fn main() {
re_log::setup_native_logging();
let port = 9090;
eprintln!("Hosting web-viewer on http://127.0.0.1:{port}");

// Shutdown server via Ctrl+C
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
ctrlc::set_handler(move || {
re_log::debug!("Ctrl-C detected - Closing web server.");
shutdown_tx.send(()).unwrap();
})
.expect("Error setting Ctrl-C handler");

re_web_viewer_server::WebViewerServer::new(port)
.serve()
.serve(shutdown_rx)
.await
.unwrap();
}
19 changes: 14 additions & 5 deletions crates/re_ws_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,26 @@ impl Server {
Ok(Self { listener })
}

/// Accept new connections forever
pub async fn listen(self, rx: Receiver<LogMsg>) -> anyhow::Result<()> {
/// Accept new connections until we get a message on `shutdown_rx`
pub async fn listen(
self,
rx: Receiver<LogMsg>,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> anyhow::Result<()> {
use anyhow::Context as _;

let history = Arc::new(Mutex::new(Vec::new()));

let log_stream = to_broadcast_stream(rx, history.clone());

while let Ok((tcp_stream, _)) = self.listener.accept().await {
loop {
let (tcp_stream, _) = tokio::select! {
res = self.listener.accept() => res?,
_ = shutdown_rx.recv() => {
return Ok(());
}
};

let peer = tcp_stream
.peer_addr()
.context("connected streams should have a peer address")?;
Expand All @@ -59,8 +70,6 @@ impl Server {
history.clone(),
));
}

Ok(())
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/rerun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ backtrace = "0.3"
clap = { workspace = true, features = ["derive"] }
mimalloc.workspace = true
puffin_http = "0.11"
ctrlc.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }

# Native unix dependencies:
Expand Down
Loading

1 comment on commit 7c36951

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust Benchmark

Benchmark suite Current: 7c36951 Previous: 068428b Ratio
datastore/insert/batch/rects/insert 556213 ns/iter (± 2209) 554196 ns/iter (± 3292) 1.00
datastore/latest_at/batch/rects/query 1867 ns/iter (± 9) 1841 ns/iter (± 27) 1.01
datastore/latest_at/missing_components/primary 288 ns/iter (± 0) 286 ns/iter (± 0) 1.01
datastore/latest_at/missing_components/secondaries 437 ns/iter (± 3) 437 ns/iter (± 0) 1
datastore/range/batch/rects/query 153733 ns/iter (± 886) 151607 ns/iter (± 716) 1.01
mono_points_arrow/generate_message_bundles 45836727 ns/iter (± 1177717) 46672403 ns/iter (± 786703) 0.98
mono_points_arrow/generate_messages 127167525 ns/iter (± 1008563) 126100634 ns/iter (± 1180934) 1.01
mono_points_arrow/encode_log_msg 157363987 ns/iter (± 1037693) 156512460 ns/iter (± 760613) 1.01
mono_points_arrow/encode_total 331611449 ns/iter (± 2367937) 331393378 ns/iter (± 1353745) 1.00
mono_points_arrow/decode_log_msg 177944395 ns/iter (± 905577) 179457473 ns/iter (± 790263) 0.99
mono_points_arrow/decode_message_bundles 64827344 ns/iter (± 833889) 63824858 ns/iter (± 1170469) 1.02
mono_points_arrow/decode_total 237827173 ns/iter (± 1586334) 241495289 ns/iter (± 1478256) 0.98
batch_points_arrow/generate_message_bundles 328368 ns/iter (± 844) 328768 ns/iter (± 1074) 1.00
batch_points_arrow/generate_messages 6465 ns/iter (± 23) 6478 ns/iter (± 24) 1.00
batch_points_arrow/encode_log_msg 358110 ns/iter (± 1351) 359020 ns/iter (± 1031) 1.00
batch_points_arrow/encode_total 719557 ns/iter (± 3155) 705416 ns/iter (± 3296) 1.02
batch_points_arrow/decode_log_msg 351267 ns/iter (± 848) 352905 ns/iter (± 1727) 1.00
batch_points_arrow/decode_message_bundles 2094 ns/iter (± 11) 2103 ns/iter (± 12) 1.00
batch_points_arrow/decode_total 355477 ns/iter (± 1358) 353439 ns/iter (± 871) 1.01
arrow_mono_points/insert 6128321436 ns/iter (± 16168249) 6066627807 ns/iter (± 32901540) 1.01
arrow_mono_points/query 1798183 ns/iter (± 9585) 1789086 ns/iter (± 19184) 1.01
arrow_batch_points/insert 2676961 ns/iter (± 11680) 2664298 ns/iter (± 13658) 1.00
arrow_batch_points/query 16167 ns/iter (± 84) 16143 ns/iter (± 30) 1.00
arrow_batch_vecs/insert 41546 ns/iter (± 254) 42639 ns/iter (± 166) 0.97
arrow_batch_vecs/query 384478 ns/iter (± 3191) 389455 ns/iter (± 801) 0.99
tuid/Tuid::random 34 ns/iter (± 0) 34 ns/iter (± 0) 1

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.