Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simpler SIGINT handling #2198

Merged
merged 9 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 4 additions & 17 deletions crates/re_sdk_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,13 @@ impl Default for ServerOptions {
/// # use re_sdk_comms::{serve, ServerOptions};
/// #[tokio::main]
/// async fn main() {
/// let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
/// let log_msg_rx = serve("0.0.0.0", 80, ServerOptions::default(), shutdown_rx).await.unwrap();
/// let log_msg_rx = serve("0.0.0.0", 80, ServerOptions::default()).await.unwrap();
/// }
/// ```
pub async fn serve(
bind_ip: &str,
port: u16,
options: ServerOptions,
shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> anyhow::Result<Receiver<LogMsg>> {
let (tx, rx) = re_smart_channel::smart_channel(
// NOTE: We don't know until we start actually accepting clients!
Expand All @@ -67,25 +65,14 @@ pub async fn serve(
);
}

tokio::spawn(listen_for_new_clients(listener, options, tx, shutdown_rx));
tokio::spawn(listen_for_new_clients(listener, options, tx));

Ok(rx)
}

async fn listen_for_new_clients(
listener: TcpListener,
options: ServerOptions,
tx: Sender<LogMsg>,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) {
async fn listen_for_new_clients(listener: TcpListener, options: ServerOptions, tx: Sender<LogMsg>) {
loop {
let incoming = tokio::select! {
res = listener.accept() => res,
_ = shutdown_rx.recv() => {
return;
}
};
match incoming {
match listener.accept().await {
Ok((stream, _)) => {
let addr = stream.peer_addr().ok();
let tx = tx.clone_as(re_smart_channel::SmartMessageSource::TcpClient { addr });
Expand Down
11 changes: 0 additions & 11 deletions crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,6 @@ pub struct App {
/// What is serialized
state: AppState,

/// Set to `true` on Ctrl-C.
shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,

/// Pending background tasks, using `poll_promise`.
pending_promises: HashMap<String, Promise<Box<dyn Any + Send>>>,

Expand Down Expand Up @@ -108,7 +105,6 @@ impl App {
re_ui: re_ui::ReUi,
storage: Option<&dyn eframe::Storage>,
rx: Receiver<LogMsg>,
shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,
) -> Self {
let (logger, text_log_rx) = re_log::ChannelLogger::new(re_log::LevelFilter::Info);
if re_log::add_boxed_logger(Box::new(logger)).is_err() {
Expand Down Expand Up @@ -139,7 +135,6 @@ impl App {
rx,
log_dbs: Default::default(),
state,
shutdown,
pending_promises: Default::default(),
toasts: toasts::Toasts::new(),
memory_panel: Default::default(),
Expand Down Expand Up @@ -486,12 +481,6 @@ impl eframe::App for App {
self.ram_limit_warner.update();
}

if self.shutdown.load(std::sync::atomic::Ordering::Relaxed) {
#[cfg(not(target_arch = "wasm32"))]
frame.close();
return;
}

#[cfg(not(target_arch = "wasm32"))]
{
// Ensure zoom factor is sane and in 10% steps at all times before applying it.
Expand Down
1 change: 0 additions & 1 deletion crates/re_viewer/src/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ pub fn run_native_viewer_with_messages(
re_ui,
cc.storage,
rx,
std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
))
}))
}
1 change: 0 additions & 1 deletion crates/re_viewer/src/remote_viewer_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ impl RemoteViewerApp {
self.re_ui.clone(),
storage,
rx,
std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
);

self.app = Some((connection, app));
Expand Down
2 changes: 0 additions & 2 deletions crates/re_viewer/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ impl WebHandle {
re_ui,
cc.storage,
rx,
std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
))
}
EndpointCategory::WebEventListener => {
Expand Down Expand Up @@ -135,7 +134,6 @@ impl WebHandle {
re_ui,
cc.storage,
rx,
std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
))
}
EndpointCategory::WebSocket(url) => {
Expand Down
1 change: 0 additions & 1 deletion crates/re_web_viewer_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ analytics = ["dep:re_analytics"]
[dependencies]
re_log.workspace = true

ctrlc.workspace = true
document-features = "0.2"
futures-util = "0.3"
hyper = { version = "0.14", features = ["full"] }
Expand Down
14 changes: 10 additions & 4 deletions crates/re_web_viewer_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,9 @@ impl WebViewerServer {
/// ``` no_run
/// # use re_web_viewer_server::{WebViewerServer, WebViewerServerPort, WebViewerServerError};
/// # async fn example() -> Result<(), WebViewerServerError> {
/// let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
/// let server = WebViewerServer::new("0.0.0.0", WebViewerServerPort::AUTO)?;
/// let server_url = server.server_url();
/// server.serve(shutdown_rx).await?;
/// server.serve().await?;
/// # Ok(()) }
/// ```
pub fn new(bind_ip: &str, port: WebViewerServerPort) -> Result<Self, WebViewerServerError> {
Expand All @@ -235,7 +234,14 @@ impl WebViewerServer {
Ok(Self { server })
}

pub async fn serve(
pub async fn serve(self) -> Result<(), WebViewerServerError> {
self.server
.await
.map_err(WebViewerServerError::ServeFailed)?;
Ok(())
}

pub async fn serve_with_graceful_shutdown(
self,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> Result<(), WebViewerServerError> {
Expand Down Expand Up @@ -286,7 +292,7 @@ impl WebViewerServerHandle {

let local_addr = web_server.server.local_addr();

tokio::spawn(async move { web_server.serve(shutdown_rx).await });
tokio::spawn(async move { web_server.serve_with_graceful_shutdown(shutdown_rx).await });

let slf = Self {
local_addr,
Expand Down
10 changes: 1 addition & 9 deletions crates/re_web_viewer_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,6 @@ async fn main() {
use clap::Parser as _;
let args = Args::parse();

// Shutdown server via Ctrl+C
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
ctrlc::set_handler(move || {
re_log::debug!("Ctrl-C detected - Closing web server.");
shutdown_tx.send(()).unwrap();
})
.expect("Error setting Ctrl-C handler");

let bind_ip = &args.bind;
let server = re_web_viewer_server::WebViewerServer::new(
bind_ip,
Expand All @@ -53,5 +45,5 @@ async fn main() {
}
}

server.serve(shutdown_rx).await.unwrap();
server.serve().await.unwrap();
}
14 changes: 12 additions & 2 deletions crates/re_ws_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,14 @@ impl RerunServer {
Ok(slf)
}

/// Accept new connections
pub async fn listen(self, rx: Receiver<LogMsg>) -> Result<(), RerunServerError> {
let (_shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
self.listen_with_graceful_shutdown(rx, shutdown_rx).await
}

/// Accept new connections until we get a message on `shutdown_rx`
pub async fn listen(
pub async fn listen_with_graceful_shutdown(
self,
rx: Receiver<LogMsg>,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
Expand Down Expand Up @@ -122,7 +128,11 @@ impl RerunServerHandle {

let local_addr = ws_server.local_addr;

tokio::spawn(async move { ws_server.listen(rerun_rx, shutdown_rx).await });
tokio::spawn(async move {
ws_server
.listen_with_graceful_shutdown(rerun_rx, shutdown_rx)
.await
});

Ok(Self {
local_addr,
Expand Down
1 change: 0 additions & 1 deletion crates/rerun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ webbrowser = { version = "0.8", optional = true }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
backtrace = "0.3"
clap = { workspace = true, features = ["derive"] }
ctrlc.workspace = true
puffin.workspace = true
rayon.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
Expand Down
10 changes: 4 additions & 6 deletions crates/rerun/src/clap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,10 @@ impl RerunArgs {

#[cfg(feature = "web_viewer")]
if matches!(self.to_behavior(), Ok(RerunBehavior::Serve)) {
use anyhow::Context as _;

let (mut shutdown_rx, _) = crate::run::setup_ctrl_c_handler();
return tokio_runtime_handle
.block_on(async { shutdown_rx.recv().await })
.context("Failed to wait for shutdown signal.");
// Sleep waiting for Ctrl-C:
tokio_runtime_handle.block_on(async {
tokio::time::sleep(std::time::Duration::from_secs(1_000_000_000)).await;
emilk marked this conversation as resolved.
Show resolved Hide resolved
});
}

Ok(())
Expand Down
1 change: 0 additions & 1 deletion crates/rerun/src/crash_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ fn install_signal_handler(build_info: BuildInfo) {
libc::SIGBUS,
libc::SIGFPE,
libc::SIGILL,
libc::SIGINT,
libc::SIGSEGV,
libc::SIGTERM,
] {
Expand Down
1 change: 0 additions & 1 deletion crates/rerun/src/native_viewer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ where
re_ui,
cc.storage,
rx,
std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
))
}))
}
Expand Down
Loading