Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions mm2src/mm2_event_stream/src/streamer_ids.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const NETWORK: &str = "NETWORK";
const HEARTBEAT: &str = "HEARTBEAT";
const SWAP_STATUS: &str = "SWAP_STATUS";
const ORDER_STATUS: &str = "ORDER_STATUS";
#[cfg(not(any(target_arch = "wasm32", target_os = "windows")))]
const SHUTDOWN_SIGNAL: &str = "SHUTDOWN_SIGNAL";

const TASK_PREFIX: &str = "TASK:";
const BALANCE_PREFIX: &str = "BALANCE:";
Expand Down Expand Up @@ -44,6 +46,8 @@ pub enum StreamerId {
ForTesting {
test_streamer: String,
},
#[cfg(not(any(target_arch = "wasm32", target_os = "windows")))]
ShutdownSignal,
}

impl fmt::Display for StreamerId {
Expand All @@ -53,6 +57,8 @@ impl fmt::Display for StreamerId {
StreamerId::Heartbeat => write!(f, "{HEARTBEAT}"),
StreamerId::SwapStatus => write!(f, "{SWAP_STATUS}"),
StreamerId::OrderStatus => write!(f, "{ORDER_STATUS}"),
#[cfg(not(any(target_arch = "wasm32", target_os = "windows")))]
StreamerId::ShutdownSignal => write!(f, "{SHUTDOWN_SIGNAL}"),
StreamerId::Task { task_id } => write!(f, "{TASK_PREFIX}{task_id}"),
StreamerId::Balance { coin } => write!(f, "{BALANCE_PREFIX}{coin}"),
StreamerId::TxHistory { coin } => write!(f, "{TX_HISTORY_PREFIX}{coin}"),
Expand Down Expand Up @@ -97,6 +103,8 @@ impl<'de> Deserialize<'de> for StreamerId {
HEARTBEAT => Ok(StreamerId::Heartbeat),
SWAP_STATUS => Ok(StreamerId::SwapStatus),
ORDER_STATUS => Ok(StreamerId::OrderStatus),
#[cfg(not(any(target_arch = "wasm32", target_os = "windows")))]
SHUTDOWN_SIGNAL => Ok(StreamerId::ShutdownSignal),
v if v.starts_with(TASK_PREFIX) => Ok(StreamerId::Task {
task_id: v[TASK_PREFIX.len()..].parse().map_err(de::Error::custom)?,
}),
Expand Down
5 changes: 4 additions & 1 deletion mm2src/mm2_main/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,14 @@ rcgen.workspace = true
rustls = { workspace = true, default-features = false }
rustls-pemfile.workspace = true
timed-map = { workspace = true, features = ["rustc-hash"] }
tokio = { workspace = true, features = ["io-util", "rt-multi-thread", "net", "signal"] }
tokio = { workspace = true, features = ["io-util", "rt-multi-thread", "net"] }

[target.'cfg(windows)'.dependencies]
winapi.workspace = true

[target.'cfg(not(any(target_arch = "wasm32", target_os = "windows")))'.dependencies]
signal-hook-tokio = { version = "0.3", features = [ "futures-v0_3" ] }

[dev-dependencies]
coins = { path = "../coins", features = ["for-tests"] }
coins_activation = { path = "../coins_activation", features = ["for-tests"] }
Expand Down
44 changes: 33 additions & 11 deletions mm2src/mm2_main/src/mm2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ pub mod lp_stats;
pub mod lp_swap;
pub mod lp_wallet;
pub mod rpc;
#[cfg(not(any(target_arch = "wasm32", target_os = "windows")))]
pub mod shutdown_signal_event;
mod swap_versioning;
#[cfg(all(target_arch = "wasm32", test))]
mod wasm_tests;
Expand Down Expand Up @@ -198,8 +200,8 @@ pub async fn lp_main(
.into_mm_arc();
ctx_cb(try_s!(ctx.ffi_handle()));

#[cfg(not(target_arch = "wasm32"))]
spawn_ctrl_c_handler(ctx.clone());
#[cfg(not(any(target_arch = "wasm32", target_os = "windows")))]
spawn_os_signal_handler(ctx.clone());

try_s!(lp_init(ctx.clone(), version, datetime).await);
Ok(ctx)
Expand All @@ -218,22 +220,42 @@ pub async fn lp_run(ctx: MmArc) {
lp_swap::clear_running_swaps(&ctx);
}

/// Handles CTRL-C signals and shutdowns the KDF runtime gracefully.
/// Handles various OS signals and shutdowns the KDF runtime gracefully.
///
/// It's important to spawn this task as soon as `Ctx` is in the correct state.
#[cfg(not(target_arch = "wasm32"))]
fn spawn_ctrl_c_handler(ctx: MmArc) {
#[cfg(not(any(target_arch = "wasm32", target_os = "windows")))]
fn spawn_os_signal_handler(ctx: MmArc) {
use crate::lp_dispatcher::{dispatch_lp_event, StopCtxEvent};
use futures::StreamExt;

common::executor::spawn(async move {
tokio::signal::ctrl_c()
.await
.expect("Couldn't listen for the CTRL-C signal.");
let signals_to_handle = [libc::SIGINT, libc::SIGTERM, libc::SIGQUIT];
let mut signals =
signal_hook_tokio::Signals::new(signals_to_handle).expect("Couldn't listen for the CTRL-C signal.");

let Some(signal) = signals.next().await else {
log::error!("Could not catch the OS signal.");
return;
};

log::info!("Wrapping things up and shutting down...");
let signal_name = match signal {
libc::SIGINT => "SIGINT".to_owned(),
libc::SIGTERM => "SIGTERM".to_owned(),
libc::SIGQUIT => "SIGQUIT".to_owned(),
_ => format!("UNKNOWN({signal})"),
};
Comment on lines +241 to +246
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we intend to handle any signal, shouldn't we init our signal hook with all signals (if there is a catch all thing).

not really a problem since we have the important signals covered, but looking at this, the unknown signal line would never be triggered (though i would prefer it like that instead of unreachable! so i don't have a problem with that either).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we intend to handle any signal

No, not any signal. We can't handle every signal (e.g., we can't handle SIGKILL). I wanted to handle ones (the obvious ones) that can be handled.


ctx.event_stream_manager
.send(&mm2_event_stream::StreamerId::ShutdownSignal, signal_name.clone())
.unwrap();

dispatch_lp_event(ctx.clone(), StopCtxEvent.into()).await;
ctx.stop().await.expect("Couldn't stop the KDF runtime.");
if signals_to_handle.contains(&signal) {
log::info!("Received {signal_name} signal from the OS. Wrapping things up and shutting down...");
dispatch_lp_event(ctx.clone(), StopCtxEvent.into()).await;
ctx.stop().await.expect("Couldn't stop the KDF runtime.");
} else {
log::warn!("Received a signal ({signal}) from the OS that cannot be handled.");
}
});
}

Expand Down
2 changes: 2 additions & 0 deletions mm2src/mm2_main/src/rpc/dispatcher/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,8 @@ async fn rpc_streaming_dispatcher(
"order_status::enable" => handle_mmrpc(ctx, request, streaming_activations::enable_order_status).await,
"tx_history::enable" => handle_mmrpc(ctx, request, streaming_activations::enable_tx_history).await,
"orderbook::enable" => handle_mmrpc(ctx, request, streaming_activations::enable_orderbook).await,
#[cfg(not(any(target_arch = "wasm32", target_os = "windows")))]
"shutdown_signal::enable" => handle_mmrpc(ctx, request, streaming_activations::enable_shutdown_signal).await,
"disable" => handle_mmrpc(ctx, request, streaming_activations::disable_streamer).await,
_ => MmError::err(DispatcherError::NoSuchMethod),
}
Expand Down
4 changes: 4 additions & 0 deletions mm2src/mm2_main/src/rpc/streaming_activations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ mod heartbeat;
mod network;
mod orderbook;
mod orders;
#[cfg(not(any(target_arch = "wasm32", target_os = "windows")))]
mod shutdown_signal;
mod swaps;
mod tx_history;

Expand All @@ -16,6 +18,8 @@ pub use heartbeat::*;
pub use network::*;
pub use orderbook::*;
pub use orders::*;
#[cfg(not(any(target_arch = "wasm32", target_os = "windows")))]
pub use shutdown_signal::*;
pub use swaps::*;
pub use tx_history::*;

Expand Down
35 changes: 35 additions & 0 deletions mm2src/mm2_main/src/rpc/streaming_activations/shutdown_signal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//! RPC activation and deactivation for the shutdown signals.
use super::{EnableStreamingRequest, EnableStreamingResponse};

use crate::shutdown_signal_event::ShutdownSignalEvent;
use common::HttpStatusCode;
use derive_more::Display;
use http::StatusCode;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::{map_to_mm::MapToMmResult, mm_error::MmResult};

#[derive(Deserialize)]
pub struct EnableShutdownSignalRequest;

#[derive(Display, Serialize, SerializeErrorType)]
#[serde(tag = "error_type", content = "error_data")]
pub enum ShutdownSignalRequestError {
EnableError(String),
}

impl HttpStatusCode for ShutdownSignalRequestError {
fn status_code(&self) -> StatusCode {
StatusCode::BAD_REQUEST
}
}

pub async fn enable_shutdown_signal(
ctx: MmArc,
req: EnableStreamingRequest<EnableShutdownSignalRequest>,
) -> MmResult<EnableStreamingResponse, ShutdownSignalRequestError> {
ctx.event_stream_manager
.add(req.client_id, ShutdownSignalEvent, ctx.spawner())
.await
.map(EnableStreamingResponse::new)
.map_to_mm(|e| ShutdownSignalRequestError::EnableError(format!("{e:?}")))
}
30 changes: 30 additions & 0 deletions mm2src/mm2_main/src/shutdown_signal_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use async_trait::async_trait;
use futures::channel::oneshot;
use futures::StreamExt;
use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput, StreamerId};

pub struct ShutdownSignalEvent;

#[async_trait]
impl EventStreamer for ShutdownSignalEvent {
type DataInType = String;

fn streamer_id(&self) -> StreamerId {
StreamerId::ShutdownSignal
}

async fn handle(
self,
broadcaster: Broadcaster,
ready_tx: oneshot::Sender<Result<(), String>>,
mut data_rx: impl StreamHandlerInput<Self::DataInType>,
) {
ready_tx
.send(Ok(()))
.expect("Receiver is dropped, which should never happen.");

while let Some(signal) = data_rx.next().await {
broadcaster.broadcast(Event::new(self.streamer_id(), json!(signal)));
}
}
}
Loading