Skip to content

Commit

Permalink
Simpler SIGINT handling (#2198)
Browse files Browse the repository at this point in the history
* Call exit(42) on SIGINT when running `python -m rerun`

* Make shutdown_rx optional for re_web_viewer_server

* Remove SIGINT from crash_handler

* Remove shutdown from re_ws_comms::server

* Remove our complex ctrl-c shutdown handling

* cargo fmt

* Fix doctest

* fix typo

Co-authored-by: Jan Procházka <[email protected]>

* Use u64::MAX

---------

Co-authored-by: Jan Procházka <[email protected]>
  • Loading branch information
emilk and jprochazk authored May 24, 2023
1 parent 7b686de commit 165de62
Show file tree
Hide file tree
Showing 18 changed files with 51 additions and 111 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 4 additions & 17 deletions crates/re_sdk_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,13 @@ impl Default for ServerOptions {
/// # use re_sdk_comms::{serve, ServerOptions};
/// #[tokio::main]
/// async fn main() {
/// let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
/// let log_msg_rx = serve("0.0.0.0", 80, ServerOptions::default(), shutdown_rx).await.unwrap();
/// let log_msg_rx = serve("0.0.0.0", 80, ServerOptions::default()).await.unwrap();
/// }
/// ```
pub async fn serve(
bind_ip: &str,
port: u16,
options: ServerOptions,
shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> anyhow::Result<Receiver<LogMsg>> {
let (tx, rx) = re_smart_channel::smart_channel(
// NOTE: We don't know until we start actually accepting clients!
Expand All @@ -67,25 +65,14 @@ pub async fn serve(
);
}

tokio::spawn(listen_for_new_clients(listener, options, tx, shutdown_rx));
tokio::spawn(listen_for_new_clients(listener, options, tx));

Ok(rx)
}

async fn listen_for_new_clients(
listener: TcpListener,
options: ServerOptions,
tx: Sender<LogMsg>,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) {
async fn listen_for_new_clients(listener: TcpListener, options: ServerOptions, tx: Sender<LogMsg>) {
loop {
let incoming = tokio::select! {
res = listener.accept() => res,
_ = shutdown_rx.recv() => {
return;
}
};
match incoming {
match listener.accept().await {
Ok((stream, _)) => {
let addr = stream.peer_addr().ok();
let tx = tx.clone_as(re_smart_channel::SmartMessageSource::TcpClient { addr });
Expand Down
11 changes: 0 additions & 11 deletions crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,6 @@ pub struct App {
/// What is serialized
state: AppState,

/// Set to `true` on Ctrl-C.
shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,

/// Pending background tasks, using `poll_promise`.
pending_promises: HashMap<String, Promise<Box<dyn Any + Send>>>,

Expand Down Expand Up @@ -108,7 +105,6 @@ impl App {
re_ui: re_ui::ReUi,
storage: Option<&dyn eframe::Storage>,
rx: Receiver<LogMsg>,
shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,
) -> Self {
let (logger, text_log_rx) = re_log::ChannelLogger::new(re_log::LevelFilter::Info);
if re_log::add_boxed_logger(Box::new(logger)).is_err() {
Expand Down Expand Up @@ -139,7 +135,6 @@ impl App {
rx,
log_dbs: Default::default(),
state,
shutdown,
pending_promises: Default::default(),
toasts: toasts::Toasts::new(),
memory_panel: Default::default(),
Expand Down Expand Up @@ -486,12 +481,6 @@ impl eframe::App for App {
self.ram_limit_warner.update();
}

if self.shutdown.load(std::sync::atomic::Ordering::Relaxed) {
#[cfg(not(target_arch = "wasm32"))]
frame.close();
return;
}

#[cfg(not(target_arch = "wasm32"))]
{
// Ensure zoom factor is sane and in 10% steps at all times before applying it.
Expand Down
1 change: 0 additions & 1 deletion crates/re_viewer/src/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ pub fn run_native_viewer_with_messages(
re_ui,
cc.storage,
rx,
std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
))
}))
}
1 change: 0 additions & 1 deletion crates/re_viewer/src/remote_viewer_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ impl RemoteViewerApp {
self.re_ui.clone(),
storage,
rx,
std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
);

self.app = Some((connection, app));
Expand Down
2 changes: 0 additions & 2 deletions crates/re_viewer/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ impl WebHandle {
re_ui,
cc.storage,
rx,
std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
))
}
EndpointCategory::WebEventListener => {
Expand Down Expand Up @@ -135,7 +134,6 @@ impl WebHandle {
re_ui,
cc.storage,
rx,
std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
))
}
EndpointCategory::WebSocket(url) => {
Expand Down
1 change: 0 additions & 1 deletion crates/re_web_viewer_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ analytics = ["dep:re_analytics"]
[dependencies]
re_log.workspace = true

ctrlc.workspace = true
document-features = "0.2"
futures-util = "0.3"
hyper = { version = "0.14", features = ["full"] }
Expand Down
14 changes: 10 additions & 4 deletions crates/re_web_viewer_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,9 @@ impl WebViewerServer {
/// ``` no_run
/// # use re_web_viewer_server::{WebViewerServer, WebViewerServerPort, WebViewerServerError};
/// # async fn example() -> Result<(), WebViewerServerError> {
/// let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
/// let server = WebViewerServer::new("0.0.0.0", WebViewerServerPort::AUTO)?;
/// let server_url = server.server_url();
/// server.serve(shutdown_rx).await?;
/// server.serve().await?;
/// # Ok(()) }
/// ```
pub fn new(bind_ip: &str, port: WebViewerServerPort) -> Result<Self, WebViewerServerError> {
Expand All @@ -235,7 +234,14 @@ impl WebViewerServer {
Ok(Self { server })
}

pub async fn serve(
pub async fn serve(self) -> Result<(), WebViewerServerError> {
self.server
.await
.map_err(WebViewerServerError::ServeFailed)?;
Ok(())
}

pub async fn serve_with_graceful_shutdown(
self,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> Result<(), WebViewerServerError> {
Expand Down Expand Up @@ -286,7 +292,7 @@ impl WebViewerServerHandle {

let local_addr = web_server.server.local_addr();

tokio::spawn(async move { web_server.serve(shutdown_rx).await });
tokio::spawn(async move { web_server.serve_with_graceful_shutdown(shutdown_rx).await });

let slf = Self {
local_addr,
Expand Down
10 changes: 1 addition & 9 deletions crates/re_web_viewer_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,6 @@ async fn main() {
use clap::Parser as _;
let args = Args::parse();

// Shutdown server via Ctrl+C
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
ctrlc::set_handler(move || {
re_log::debug!("Ctrl-C detected - Closing web server.");
shutdown_tx.send(()).unwrap();
})
.expect("Error setting Ctrl-C handler");

let bind_ip = &args.bind;
let server = re_web_viewer_server::WebViewerServer::new(
bind_ip,
Expand All @@ -53,5 +45,5 @@ async fn main() {
}
}

server.serve(shutdown_rx).await.unwrap();
server.serve().await.unwrap();
}
14 changes: 12 additions & 2 deletions crates/re_ws_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,14 @@ impl RerunServer {
Ok(slf)
}

/// Accept new connections
pub async fn listen(self, rx: Receiver<LogMsg>) -> Result<(), RerunServerError> {
let (_shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
self.listen_with_graceful_shutdown(rx, shutdown_rx).await
}

/// Accept new connections until we get a message on `shutdown_rx`
pub async fn listen(
pub async fn listen_with_graceful_shutdown(
self,
rx: Receiver<LogMsg>,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
Expand Down Expand Up @@ -122,7 +128,11 @@ impl RerunServerHandle {

let local_addr = ws_server.local_addr;

tokio::spawn(async move { ws_server.listen(rerun_rx, shutdown_rx).await });
tokio::spawn(async move {
ws_server
.listen_with_graceful_shutdown(rerun_rx, shutdown_rx)
.await
});

Ok(Self {
local_addr,
Expand Down
1 change: 0 additions & 1 deletion crates/rerun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ webbrowser = { version = "0.8", optional = true }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
backtrace = "0.3"
clap = { workspace = true, features = ["derive"] }
ctrlc.workspace = true
puffin.workspace = true
rayon.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
Expand Down
10 changes: 4 additions & 6 deletions crates/rerun/src/clap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,10 @@ impl RerunArgs {

#[cfg(feature = "web_viewer")]
if matches!(self.to_behavior(), Ok(RerunBehavior::Serve)) {
use anyhow::Context as _;

let (mut shutdown_rx, _) = crate::run::setup_ctrl_c_handler();
return tokio_runtime_handle
.block_on(async { shutdown_rx.recv().await })
.context("Failed to wait for shutdown signal.");
// Sleep waiting for Ctrl-C:
tokio_runtime_handle.block_on(async {
tokio::time::sleep(std::time::Duration::from_secs(u64::MAX)).await;
});
}

Ok(())
Expand Down
1 change: 0 additions & 1 deletion crates/rerun/src/crash_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ fn install_signal_handler(build_info: BuildInfo) {
libc::SIGBUS,
libc::SIGFPE,
libc::SIGILL,
libc::SIGINT,
libc::SIGSEGV,
libc::SIGTERM,
] {
Expand Down
1 change: 0 additions & 1 deletion crates/rerun/src/native_viewer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ where
re_ui,
cc.storage,
rx,
std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
))
}))
}
Expand Down
Loading

0 comments on commit 165de62

Please sign in to comment.