diff --git a/Cargo.lock b/Cargo.lock index e3d7870b96..7833b880d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4405,6 +4405,7 @@ dependencies = [ "serialization", "serialization_derive", "sia-rust", + "signal-hook-tokio", "sp-runtime-interface", "sp-trie", "spv_validation", @@ -6873,6 +6874,16 @@ dependencies = [ "url", ] +[[package]] +name = "signal-hook" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d881a16cf4426aa584979d30bd82cb33429027e42122b169753d6ef1085ed6e2" +dependencies = [ + "libc", + "signal-hook-registry", +] + [[package]] name = "signal-hook-registry" version = "1.4.0" @@ -6882,6 +6893,18 @@ dependencies = [ "libc", ] +[[package]] +name = "signal-hook-tokio" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "213241f76fb1e37e27de3b6aa1b068a2c333233b59cca6634f634b80a27ecf1e" +dependencies = [ + "futures-core", + "libc", + "signal-hook", + "tokio", +] + [[package]] name = "signature" version = "1.6.4" diff --git a/mm2src/mm2_event_stream/src/streamer_ids.rs b/mm2src/mm2_event_stream/src/streamer_ids.rs index 4cb0466e3a..e1a168e16f 100644 --- a/mm2src/mm2_event_stream/src/streamer_ids.rs +++ b/mm2src/mm2_event_stream/src/streamer_ids.rs @@ -6,6 +6,8 @@ const NETWORK: &str = "NETWORK"; const HEARTBEAT: &str = "HEARTBEAT"; const SWAP_STATUS: &str = "SWAP_STATUS"; const ORDER_STATUS: &str = "ORDER_STATUS"; +#[cfg(not(any(target_arch = "wasm32", target_os = "windows")))] +const SHUTDOWN_SIGNAL: &str = "SHUTDOWN_SIGNAL"; const TASK_PREFIX: &str = "TASK:"; const BALANCE_PREFIX: &str = "BALANCE:"; @@ -44,6 +46,8 @@ pub enum StreamerId { ForTesting { test_streamer: String, }, + #[cfg(not(any(target_arch = "wasm32", target_os = "windows")))] + ShutdownSignal, } impl fmt::Display for StreamerId { @@ -53,6 +57,8 @@ impl fmt::Display for StreamerId { StreamerId::Heartbeat => write!(f, "{HEARTBEAT}"), StreamerId::SwapStatus => write!(f, "{SWAP_STATUS}"), StreamerId::OrderStatus => write!(f, "{ORDER_STATUS}"), + #[cfg(not(any(target_arch = "wasm32", target_os = "windows")))] + StreamerId::ShutdownSignal => write!(f, "{SHUTDOWN_SIGNAL}"), StreamerId::Task { task_id } => write!(f, "{TASK_PREFIX}{task_id}"), StreamerId::Balance { coin } => write!(f, "{BALANCE_PREFIX}{coin}"), StreamerId::TxHistory { coin } => write!(f, "{TX_HISTORY_PREFIX}{coin}"), @@ -97,6 +103,8 @@ impl<'de> Deserialize<'de> for StreamerId { HEARTBEAT => Ok(StreamerId::Heartbeat), SWAP_STATUS => Ok(StreamerId::SwapStatus), ORDER_STATUS => Ok(StreamerId::OrderStatus), + #[cfg(not(any(target_arch = "wasm32", target_os = "windows")))] + SHUTDOWN_SIGNAL => Ok(StreamerId::ShutdownSignal), v if v.starts_with(TASK_PREFIX) => Ok(StreamerId::Task { task_id: v[TASK_PREFIX.len()..].parse().map_err(de::Error::custom)?, }), diff --git a/mm2src/mm2_main/Cargo.toml b/mm2src/mm2_main/Cargo.toml index c28dc9714c..e67fb0d438 100644 --- a/mm2src/mm2_main/Cargo.toml +++ b/mm2src/mm2_main/Cargo.toml @@ -128,11 +128,14 @@ rcgen.workspace = true rustls = { workspace = true, default-features = false } rustls-pemfile.workspace = true timed-map = { workspace = true, features = ["rustc-hash"] } -tokio = { workspace = true, features = ["io-util", "rt-multi-thread", "net", "signal"] } +tokio = { workspace = true, features = ["io-util", "rt-multi-thread", "net"] } [target.'cfg(windows)'.dependencies] winapi.workspace = true +[target.'cfg(not(any(target_arch = "wasm32", target_os = "windows")))'.dependencies] +signal-hook-tokio = { version = "0.3", features = [ "futures-v0_3" ] } + [dev-dependencies] coins = { path = "../coins", features = ["for-tests"] } coins_activation = { path = "../coins_activation", features = ["for-tests"] } diff --git a/mm2src/mm2_main/src/mm2.rs b/mm2src/mm2_main/src/mm2.rs index 58931daed9..bec263d0cd 100644 --- a/mm2src/mm2_main/src/mm2.rs +++ b/mm2src/mm2_main/src/mm2.rs @@ -85,6 +85,8 @@ pub mod lp_stats; pub mod lp_swap; pub mod lp_wallet; pub mod rpc; +#[cfg(not(any(target_arch = "wasm32", target_os = "windows")))] +pub mod shutdown_signal_event; mod swap_versioning; #[cfg(all(target_arch = "wasm32", test))] mod wasm_tests; @@ -198,8 +200,8 @@ pub async fn lp_main( .into_mm_arc(); ctx_cb(try_s!(ctx.ffi_handle())); - #[cfg(not(target_arch = "wasm32"))] - spawn_ctrl_c_handler(ctx.clone()); + #[cfg(not(any(target_arch = "wasm32", target_os = "windows")))] + spawn_os_signal_handler(ctx.clone()); try_s!(lp_init(ctx.clone(), version, datetime).await); Ok(ctx) @@ -218,22 +220,42 @@ pub async fn lp_run(ctx: MmArc) { lp_swap::clear_running_swaps(&ctx); } -/// Handles CTRL-C signals and shutdowns the KDF runtime gracefully. +/// Handles various OS signals and shutdowns the KDF runtime gracefully. /// /// It's important to spawn this task as soon as `Ctx` is in the correct state. -#[cfg(not(target_arch = "wasm32"))] -fn spawn_ctrl_c_handler(ctx: MmArc) { +#[cfg(not(any(target_arch = "wasm32", target_os = "windows")))] +fn spawn_os_signal_handler(ctx: MmArc) { use crate::lp_dispatcher::{dispatch_lp_event, StopCtxEvent}; + use futures::StreamExt; common::executor::spawn(async move { - tokio::signal::ctrl_c() - .await - .expect("Couldn't listen for the CTRL-C signal."); + let signals_to_handle = [libc::SIGINT, libc::SIGTERM, libc::SIGQUIT]; + let mut signals = + signal_hook_tokio::Signals::new(signals_to_handle).expect("Couldn't listen for the CTRL-C signal."); + + let Some(signal) = signals.next().await else { + log::error!("Could not catch the OS signal."); + return; + }; - log::info!("Wrapping things up and shutting down..."); + let signal_name = match signal { + libc::SIGINT => "SIGINT".to_owned(), + libc::SIGTERM => "SIGTERM".to_owned(), + libc::SIGQUIT => "SIGQUIT".to_owned(), + _ => format!("UNKNOWN({signal})"), + }; + + ctx.event_stream_manager + .send(&mm2_event_stream::StreamerId::ShutdownSignal, signal_name.clone()) + .unwrap(); - dispatch_lp_event(ctx.clone(), StopCtxEvent.into()).await; - ctx.stop().await.expect("Couldn't stop the KDF runtime."); + if signals_to_handle.contains(&signal) { + log::info!("Received {signal_name} signal from the OS. Wrapping things up and shutting down..."); + dispatch_lp_event(ctx.clone(), StopCtxEvent.into()).await; + ctx.stop().await.expect("Couldn't stop the KDF runtime."); + } else { + log::warn!("Received a signal ({signal}) from the OS that cannot be handled."); + } }); } diff --git a/mm2src/mm2_main/src/rpc/dispatcher/dispatcher.rs b/mm2src/mm2_main/src/rpc/dispatcher/dispatcher.rs index c724175f45..6bce256097 100644 --- a/mm2src/mm2_main/src/rpc/dispatcher/dispatcher.rs +++ b/mm2src/mm2_main/src/rpc/dispatcher/dispatcher.rs @@ -430,6 +430,8 @@ async fn rpc_streaming_dispatcher( "order_status::enable" => handle_mmrpc(ctx, request, streaming_activations::enable_order_status).await, "tx_history::enable" => handle_mmrpc(ctx, request, streaming_activations::enable_tx_history).await, "orderbook::enable" => handle_mmrpc(ctx, request, streaming_activations::enable_orderbook).await, + #[cfg(not(any(target_arch = "wasm32", target_os = "windows")))] + "shutdown_signal::enable" => handle_mmrpc(ctx, request, streaming_activations::enable_shutdown_signal).await, "disable" => handle_mmrpc(ctx, request, streaming_activations::disable_streamer).await, _ => MmError::err(DispatcherError::NoSuchMethod), } diff --git a/mm2src/mm2_main/src/rpc/streaming_activations/mod.rs b/mm2src/mm2_main/src/rpc/streaming_activations/mod.rs index 6aa5ca04da..b75577fa9f 100644 --- a/mm2src/mm2_main/src/rpc/streaming_activations/mod.rs +++ b/mm2src/mm2_main/src/rpc/streaming_activations/mod.rs @@ -5,6 +5,8 @@ mod heartbeat; mod network; mod orderbook; mod orders; +#[cfg(not(any(target_arch = "wasm32", target_os = "windows")))] +mod shutdown_signal; mod swaps; mod tx_history; @@ -16,6 +18,8 @@ pub use heartbeat::*; pub use network::*; pub use orderbook::*; pub use orders::*; +#[cfg(not(any(target_arch = "wasm32", target_os = "windows")))] +pub use shutdown_signal::*; pub use swaps::*; pub use tx_history::*; diff --git a/mm2src/mm2_main/src/rpc/streaming_activations/shutdown_signal.rs b/mm2src/mm2_main/src/rpc/streaming_activations/shutdown_signal.rs new file mode 100644 index 0000000000..4e5603194d --- /dev/null +++ b/mm2src/mm2_main/src/rpc/streaming_activations/shutdown_signal.rs @@ -0,0 +1,35 @@ +//! RPC activation and deactivation for the shutdown signals. +use super::{EnableStreamingRequest, EnableStreamingResponse}; + +use crate::shutdown_signal_event::ShutdownSignalEvent; +use common::HttpStatusCode; +use derive_more::Display; +use http::StatusCode; +use mm2_core::mm_ctx::MmArc; +use mm2_err_handle::{map_to_mm::MapToMmResult, mm_error::MmResult}; + +#[derive(Deserialize)] +pub struct EnableShutdownSignalRequest; + +#[derive(Display, Serialize, SerializeErrorType)] +#[serde(tag = "error_type", content = "error_data")] +pub enum ShutdownSignalRequestError { + EnableError(String), +} + +impl HttpStatusCode for ShutdownSignalRequestError { + fn status_code(&self) -> StatusCode { + StatusCode::BAD_REQUEST + } +} + +pub async fn enable_shutdown_signal( + ctx: MmArc, + req: EnableStreamingRequest, +) -> MmResult { + ctx.event_stream_manager + .add(req.client_id, ShutdownSignalEvent, ctx.spawner()) + .await + .map(EnableStreamingResponse::new) + .map_to_mm(|e| ShutdownSignalRequestError::EnableError(format!("{e:?}"))) +} diff --git a/mm2src/mm2_main/src/shutdown_signal_event.rs b/mm2src/mm2_main/src/shutdown_signal_event.rs new file mode 100644 index 0000000000..f7e3340819 --- /dev/null +++ b/mm2src/mm2_main/src/shutdown_signal_event.rs @@ -0,0 +1,30 @@ +use async_trait::async_trait; +use futures::channel::oneshot; +use futures::StreamExt; +use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput, StreamerId}; + +pub struct ShutdownSignalEvent; + +#[async_trait] +impl EventStreamer for ShutdownSignalEvent { + type DataInType = String; + + fn streamer_id(&self) -> StreamerId { + StreamerId::ShutdownSignal + } + + async fn handle( + self, + broadcaster: Broadcaster, + ready_tx: oneshot::Sender>, + mut data_rx: impl StreamHandlerInput, + ) { + ready_tx + .send(Ok(())) + .expect("Receiver is dropped, which should never happen."); + + while let Some(signal) = data_rx.next().await { + broadcaster.broadcast(Event::new(self.streamer_id(), json!(signal))); + } + } +}