Skip to content

Commit

Permalink
Simpler SIGINT handling (#2198)
Browse files Browse the repository at this point in the history
* Call exit(42) on SIGINT when running `python -m rerun`

* Make shutdown_rx optional for re_web_viewer_server

* Remove SIGINT from crash_handler

* Remove shutdown from re_ws_comms::server

* Remove our complex ctrl-c shutdown handling

* cargo fmt

* Fix doctest

* fix typo

Co-authored-by: Jan Procházka <[email protected]>

* Use u64::MAX

---------

Co-authored-by: Jan Procházka <[email protected]>
  • Loading branch information
emilk and jprochazk committed May 25, 2023
1 parent a04711f commit 374fa9a
Show file tree
Hide file tree
Showing 18 changed files with 63 additions and 123 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 16 additions & 29 deletions crates/re_sdk_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,46 +28,19 @@ impl Default for ServerOptions {
}
}

async fn listen_for_new_clients(
listener: TcpListener,
options: ServerOptions,
tx: Sender<LogMsg>,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) {
loop {
let incoming = tokio::select! {
res = listener.accept() => res,
_ = shutdown_rx.recv() => {
return;
}
};
match incoming {
Ok((stream, _)) => {
let tx = tx.clone();
spawn_client(stream, tx, options);
}
Err(err) => {
re_log::warn!("Failed to accept incoming SDK client: {err}");
}
}
}
}

/// Listen to multiple SDK:s connecting to us over TCP.
///
/// ``` no_run
/// # use re_sdk_comms::{serve, ServerOptions};
/// #[tokio::main]
/// async fn main() {
/// let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
/// let log_msg_rx = serve("0.0.0.0", 80, ServerOptions::default(), shutdown_rx).await.unwrap();
/// let log_msg_rx = serve("0.0.0.0", 80, ServerOptions::default()).await.unwrap();
/// }
/// ```
pub async fn serve(
bind_ip: &str,
port: u16,
options: ServerOptions,
shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> anyhow::Result<Receiver<LogMsg>> {
let (tx, rx) = re_smart_channel::smart_channel(re_smart_channel::Source::TcpServer { port });

Expand All @@ -88,11 +61,25 @@ pub async fn serve(
);
}

tokio::spawn(listen_for_new_clients(listener, options, tx, shutdown_rx));
tokio::spawn(listen_for_new_clients(listener, options, tx));

Ok(rx)
}

async fn listen_for_new_clients(listener: TcpListener, options: ServerOptions, tx: Sender<LogMsg>) {
loop {
match listener.accept().await {
Ok((stream, _)) => {
let tx = tx.clone();
spawn_client(stream, tx, options);
}
Err(err) => {
re_log::warn!("Failed to accept incoming SDK client: {err}");
}
}
}
}

fn spawn_client(stream: TcpStream, tx: Sender<LogMsg>, options: ServerOptions) {
tokio::spawn(async move {
let addr_string = stream
Expand Down
11 changes: 0 additions & 11 deletions crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ pub struct App {
/// What is serialized
state: AppState,

/// Set to `true` on Ctrl-C.
shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,

/// Pending background tasks, using `poll_promise`.
pending_promises: HashMap<String, Promise<Box<dyn Any + Send>>>,

Expand Down Expand Up @@ -105,7 +102,6 @@ impl App {
re_ui: re_ui::ReUi,
storage: Option<&dyn eframe::Storage>,
rx: Receiver<LogMsg>,
shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,
) -> Self {
let (logger, text_log_rx) = re_log::ChannelLogger::new(re_log::LevelFilter::Info);
if re_log::add_boxed_logger(Box::new(logger)).is_err() {
Expand Down Expand Up @@ -142,7 +138,6 @@ impl App {
rx,
log_dbs: Default::default(),
state,
shutdown,
pending_promises: Default::default(),
toasts: toasts::Toasts::new(),
memory_panel: Default::default(),
Expand Down Expand Up @@ -454,12 +449,6 @@ impl eframe::App for App {
self.ram_limit_warner.update();
}

if self.shutdown.load(std::sync::atomic::Ordering::Relaxed) {
#[cfg(not(target_arch = "wasm32"))]
frame.close();
return;
}

#[cfg(not(target_arch = "wasm32"))]
{
// Ensure zoom factor is sane and in 10% steps at all times before applying it.
Expand Down
1 change: 0 additions & 1 deletion crates/re_viewer/src/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ pub fn run_native_viewer_with_messages(
re_ui,
cc.storage,
rx,
std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
))
}))
}
1 change: 0 additions & 1 deletion crates/re_viewer/src/remote_viewer_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ impl RemoteViewerApp {
self.re_ui.clone(),
storage,
rx,
std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
);

self.app = Some((connection, app));
Expand Down
2 changes: 0 additions & 2 deletions crates/re_viewer/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ impl WebHandle {
re_ui,
cc.storage,
rx,
std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
))
}
EndpointCategory::WebEventListener => {
Expand All @@ -102,7 +101,6 @@ impl WebHandle {
re_ui,
cc.storage,
rx,
std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
))
}
EndpointCategory::WebSocket(url) => {
Expand Down
1 change: 0 additions & 1 deletion crates/re_web_viewer_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ analytics = ["dep:re_analytics"]
[dependencies]
re_log.workspace = true

ctrlc.workspace = true
document-features = "0.2"
futures-util = "0.3"
hyper = { version = "0.14", features = ["full"] }
Expand Down
14 changes: 10 additions & 4 deletions crates/re_web_viewer_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,9 @@ impl WebViewerServer {
/// ``` no_run
/// # use re_web_viewer_server::{WebViewerServer, WebViewerServerPort, WebViewerServerError};
/// # async fn example() -> Result<(), WebViewerServerError> {
/// let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
/// let server = WebViewerServer::new("0.0.0.0", WebViewerServerPort::AUTO)?;
/// let server_url = server.server_url();
/// server.serve(shutdown_rx).await?;
/// server.serve().await?;
/// # Ok(()) }
/// ```
pub fn new(bind_ip: &str, port: WebViewerServerPort) -> Result<Self, WebViewerServerError> {
Expand All @@ -235,7 +234,14 @@ impl WebViewerServer {
Ok(Self { server })
}

pub async fn serve(
pub async fn serve(self) -> Result<(), WebViewerServerError> {
self.server
.await
.map_err(WebViewerServerError::ServeFailed)?;
Ok(())
}

pub async fn serve_with_graceful_shutdown(
self,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> Result<(), WebViewerServerError> {
Expand Down Expand Up @@ -286,7 +292,7 @@ impl WebViewerServerHandle {

let local_addr = web_server.server.local_addr();

tokio::spawn(async move { web_server.serve(shutdown_rx).await });
tokio::spawn(async move { web_server.serve_with_graceful_shutdown(shutdown_rx).await });

let slf = Self {
local_addr,
Expand Down
10 changes: 1 addition & 9 deletions crates/re_web_viewer_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,6 @@ async fn main() {
use clap::Parser as _;
let args = Args::parse();

// Shutdown server via Ctrl+C
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
ctrlc::set_handler(move || {
re_log::debug!("Ctrl-C detected - Closing web server.");
shutdown_tx.send(()).unwrap();
})
.expect("Error setting Ctrl-C handler");

let bind_ip = &args.bind;
let server = re_web_viewer_server::WebViewerServer::new(
bind_ip,
Expand All @@ -53,5 +45,5 @@ async fn main() {
}
}

server.serve(shutdown_rx).await.unwrap();
server.serve().await.unwrap();
}
14 changes: 12 additions & 2 deletions crates/re_ws_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,14 @@ impl RerunServer {
Ok(slf)
}

/// Accept new connections
pub async fn listen(self, rx: Receiver<LogMsg>) -> Result<(), RerunServerError> {
let (_shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
self.listen_with_graceful_shutdown(rx, shutdown_rx).await
}

/// Accept new connections until we get a message on `shutdown_rx`
pub async fn listen(
pub async fn listen_with_graceful_shutdown(
self,
rx: Receiver<LogMsg>,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
Expand Down Expand Up @@ -122,7 +128,11 @@ impl RerunServerHandle {

let local_addr = ws_server.local_addr;

tokio::spawn(async move { ws_server.listen(rerun_rx, shutdown_rx).await });
tokio::spawn(async move {
ws_server
.listen_with_graceful_shutdown(rerun_rx, shutdown_rx)
.await
});

Ok(Self {
local_addr,
Expand Down
1 change: 0 additions & 1 deletion crates/rerun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ webbrowser = { version = "0.8", optional = true }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
backtrace = "0.3"
clap = { workspace = true, features = ["derive"] }
ctrlc.workspace = true
puffin.workspace = true
rayon.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
Expand Down
10 changes: 4 additions & 6 deletions crates/rerun/src/clap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,10 @@ impl RerunArgs {

#[cfg(feature = "web_viewer")]
if matches!(self.to_behavior(), Ok(RerunBehavior::Serve)) {
use anyhow::Context as _;

let (mut shutdown_rx, _) = crate::run::setup_ctrl_c_handler();
return tokio_runtime_handle
.block_on(async { shutdown_rx.recv().await })
.context("Failed to wait for shutdown signal.");
// Sleep waiting for Ctrl-C:
tokio_runtime_handle.block_on(async {
tokio::time::sleep(std::time::Duration::from_secs(u64::MAX)).await;
});
}

Ok(())
Expand Down
1 change: 0 additions & 1 deletion crates/rerun/src/crash_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ fn install_signal_handler(build_info: BuildInfo) {
libc::SIGBUS,
libc::SIGFPE,
libc::SIGILL,
libc::SIGINT,
libc::SIGSEGV,
libc::SIGTERM,
] {
Expand Down
1 change: 0 additions & 1 deletion crates/rerun/src/native_viewer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ where
re_ui,
cc.storage,
rx,
std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
))
}))
}
Expand Down
Loading

0 comments on commit 374fa9a

Please sign in to comment.