From 11e28b78bddcd70ef04498f0b5d662e402cd4ea5 Mon Sep 17 00:00:00 2001 From: BigFish2086 Date: Mon, 12 May 2025 19:39:38 +0300 Subject: [PATCH 1/3] refactor(event-streaming): impl DeriveStreamerId trait for all streamers #2441 - Added DeriveStreamerId trait with InitParam for new and DeriveParam for derive_streamer_id, including lifetime 'a for flexible references. - Refactored streamer structs to use &str for DeriveParam where applicable. --- .../tendermint/tendermint_tx_history_v2.rs | 2 +- .../utxo/rpc_clients/electrum_rpc/client.rs | 2 +- mm2src/coins/utxo/tx_history_events.rs | 12 ++++++---- mm2src/coins/utxo/utxo_balance_events.rs | 11 +++++---- mm2src/coins/utxo/utxo_tx_history_v2.rs | 2 +- .../storage/blockdb/blockdb_sql_storage.rs | 1 + mm2src/coins/z_coin/tx_history_events.rs | 12 ++++++---- .../coins/z_coin/tx_streaming_tests/native.rs | 1 + mm2src/coins/z_coin/z_balance_streaming.rs | 12 ++++++---- mm2src/mm2_event_stream/src/lib.rs | 2 +- mm2src/mm2_event_stream/src/streamer.rs | 8 +++++++ mm2src/mm2_main/src/lp_ordermatch.rs | 24 ++++++++++--------- .../src/lp_ordermatch/order_events.rs | 15 ++++++------ .../src/lp_ordermatch/orderbook_events.rs | 13 ++++++---- mm2src/mm2_main/src/lp_swap/maker_swap.rs | 9 ++++--- mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs | 9 ++++--- mm2src/mm2_main/src/lp_swap/swap_events.rs | 15 ++++++------ mm2src/mm2_main/src/lp_swap/taker_swap.rs | 9 ++++--- mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs | 9 ++++--- .../src/rpc/streaming_activations/balance.rs | 1 + .../rpc/streaming_activations/orderbook.rs | 3 ++- .../src/rpc/streaming_activations/orders.rs | 3 ++- .../src/rpc/streaming_activations/swaps.rs | 3 ++- .../rpc/streaming_activations/tx_history.rs | 1 + 24 files changed, 111 insertions(+), 68 deletions(-) diff --git a/mm2src/coins/tendermint/tendermint_tx_history_v2.rs b/mm2src/coins/tendermint/tendermint_tx_history_v2.rs index 252b9ad4cd..bad70a910e 100644 --- a/mm2src/coins/tendermint/tendermint_tx_history_v2.rs +++ b/mm2src/coins/tendermint/tendermint_tx_history_v2.rs @@ -18,7 +18,7 @@ use cosmrs::tendermint::abci::{Code as TxCode, EventAttribute}; use cosmrs::tx::Fee; use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::MmResult; -use mm2_event_stream::StreamingManager; +use mm2_event_stream::{DeriveStreamerId, StreamingManager}; use mm2_number::BigDecimal; use mm2_state_machine::prelude::*; use mm2_state_machine::state_machine::StateMachineTrait; diff --git a/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs b/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs index 3cf3996f09..44035eb3e3 100644 --- a/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs +++ b/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs @@ -51,7 +51,7 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use futures01::Future; use itertools::Itertools; -use mm2_event_stream::{StreamingManager, StreamingManagerError}; +use mm2_event_stream::{DeriveStreamerId, StreamingManager, StreamingManagerError}; use serde_json::{self as json, Value as Json}; type ElectrumTxHistory = Vec; diff --git a/mm2src/coins/utxo/tx_history_events.rs b/mm2src/coins/utxo/tx_history_events.rs index e0404228a9..536482690b 100644 --- a/mm2src/coins/utxo/tx_history_events.rs +++ b/mm2src/coins/utxo/tx_history_events.rs @@ -1,5 +1,5 @@ use crate::TransactionDetails; -use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput, StreamerId}; +use mm2_event_stream::{Broadcaster, DeriveStreamerId, Event, EventStreamer, StreamHandlerInput, StreamerId}; use async_trait::async_trait; use futures::channel::oneshot; @@ -9,12 +9,14 @@ pub struct TxHistoryEventStreamer { coin: String, } -impl TxHistoryEventStreamer { - #[inline(always)] - pub fn new(coin: String) -> Self { Self { coin } } +impl<'a> DeriveStreamerId<'a> for TxHistoryEventStreamer { + type InitParam = String; + type DeriveParam = &'a str; + + fn new(coin: Self::InitParam) -> Self { Self { coin } } #[inline(always)] - pub fn derive_streamer_id(coin: &str) -> StreamerId { StreamerId::TxHistory { coin: coin.to_string() } } + fn derive_streamer_id(coin: Self::DeriveParam) -> StreamerId { StreamerId::TxHistory { coin: coin.to_string() } } } #[async_trait] diff --git a/mm2src/coins/utxo/utxo_balance_events.rs b/mm2src/coins/utxo/utxo_balance_events.rs index 9451675cdb..13ef83a4fd 100644 --- a/mm2src/coins/utxo/utxo_balance_events.rs +++ b/mm2src/coins/utxo/utxo_balance_events.rs @@ -12,7 +12,7 @@ use common::log; use futures::channel::oneshot; use futures::StreamExt; use keys::Address; -use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput, StreamerId}; +use mm2_event_stream::{Broadcaster, DeriveStreamerId, Event, EventStreamer, StreamHandlerInput, StreamerId}; use std::collections::{HashMap, HashSet}; macro_rules! try_or_continue { @@ -31,8 +31,11 @@ pub struct UtxoBalanceEventStreamer { coin: UtxoStandardCoin, } -impl UtxoBalanceEventStreamer { - pub fn new(utxo_arc: UtxoArc) -> Self { +impl<'a> DeriveStreamerId<'a> for UtxoBalanceEventStreamer { + type InitParam = UtxoArc; + type DeriveParam = &'a str; + + fn new(utxo_arc: Self::InitParam) -> Self { Self { // We wrap the UtxoArc in a UtxoStandardCoin for easier method accessibility. // The UtxoArc might belong to a different coin type though. @@ -40,7 +43,7 @@ impl UtxoBalanceEventStreamer { } } - pub fn derive_streamer_id(coin: &str) -> StreamerId { StreamerId::Balance { coin: coin.to_string() } } + fn derive_streamer_id(coin: Self::DeriveParam) -> StreamerId { StreamerId::Balance { coin: coin.to_string() } } } #[async_trait] diff --git a/mm2src/coins/utxo/utxo_tx_history_v2.rs b/mm2src/coins/utxo/utxo_tx_history_v2.rs index 0231bdc4c7..ecfbca7166 100644 --- a/mm2src/coins/utxo/utxo_tx_history_v2.rs +++ b/mm2src/coins/utxo/utxo_tx_history_v2.rs @@ -15,7 +15,7 @@ use common::log::{error, info}; use derive_more::Display; use keys::Address; use mm2_err_handle::prelude::*; -use mm2_event_stream::StreamingManager; +use mm2_event_stream::{DeriveStreamerId, StreamingManager}; use mm2_metrics::MetricsArc; use mm2_number::BigDecimal; use mm2_state_machine::prelude::*; diff --git a/mm2src/coins/z_coin/storage/blockdb/blockdb_sql_storage.rs b/mm2src/coins/z_coin/storage/blockdb/blockdb_sql_storage.rs index 8dd4dd39f7..e1c8940ea8 100644 --- a/mm2src/coins/z_coin/storage/blockdb/blockdb_sql_storage.rs +++ b/mm2src/coins/z_coin/storage/blockdb/blockdb_sql_storage.rs @@ -11,6 +11,7 @@ use db_common::sqlite::{query_single_row, run_optimization_pragmas, rusqlite}; use itertools::Itertools; use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::*; +use mm2_event_stream::DeriveStreamerId; use protobuf::Message; use std::path::PathBuf; use std::sync::{Arc, Mutex}; diff --git a/mm2src/coins/z_coin/tx_history_events.rs b/mm2src/coins/z_coin/tx_history_events.rs index c09da5d732..485c1e8053 100644 --- a/mm2src/coins/z_coin/tx_history_events.rs +++ b/mm2src/coins/z_coin/tx_history_events.rs @@ -4,7 +4,7 @@ use crate::utxo::rpc_clients::UtxoRpcError; use crate::MarketCoinOps; use common::log; use mm2_err_handle::prelude::MmError; -use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput, StreamerId}; +use mm2_event_stream::{Broadcaster, DeriveStreamerId, Event, EventStreamer, StreamHandlerInput, StreamerId}; use rpc::v1::types::H256 as H256Json; use async_trait::async_trait; @@ -18,12 +18,14 @@ pub struct ZCoinTxHistoryEventStreamer { coin: ZCoin, } -impl ZCoinTxHistoryEventStreamer { - #[inline(always)] - pub fn new(coin: ZCoin) -> Self { Self { coin } } +impl<'a> DeriveStreamerId<'a> for ZCoinTxHistoryEventStreamer { + type InitParam = ZCoin; + type DeriveParam = &'a str; + + fn new(coin: Self::InitParam) -> Self { Self { coin } } #[inline(always)] - pub fn derive_streamer_id(coin: &str) -> StreamerId { StreamerId::TxHistory { coin: coin.to_string() } } + fn derive_streamer_id(coin: Self::DeriveParam) -> StreamerId { StreamerId::TxHistory { coin: coin.to_string() } } } #[async_trait] diff --git a/mm2src/coins/z_coin/tx_streaming_tests/native.rs b/mm2src/coins/z_coin/tx_streaming_tests/native.rs index 6cb7f9a97c..0234b53656 100644 --- a/mm2src/coins/z_coin/tx_streaming_tests/native.rs +++ b/mm2src/coins/z_coin/tx_streaming_tests/native.rs @@ -7,6 +7,7 @@ use crate::{CoinProtocol, DexFee, MarketCoinOps, MmCoin, PrivKeyBuildPolicy}; use common::custom_futures::timeout::FutureTimerExt; use common::{block_on, Future01CompatExt}; use mm2_core::mm_ctx::MmCtxBuilder; +use mm2_event_stream::DeriveStreamerId; use mm2_test_helpers::for_tests::{pirate_conf, ARRR}; use std::time::Duration; diff --git a/mm2src/coins/z_coin/z_balance_streaming.rs b/mm2src/coins/z_coin/z_balance_streaming.rs index c5b012fb3b..45bdaa73d3 100644 --- a/mm2src/coins/z_coin/z_balance_streaming.rs +++ b/mm2src/coins/z_coin/z_balance_streaming.rs @@ -6,18 +6,20 @@ use async_trait::async_trait; use common::log::error; use futures::channel::oneshot; use futures_util::StreamExt; -use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput, StreamerId}; +use mm2_event_stream::{Broadcaster, DeriveStreamerId, Event, EventStreamer, StreamHandlerInput, StreamerId}; pub struct ZCoinBalanceEventStreamer { coin: ZCoin, } -impl ZCoinBalanceEventStreamer { - #[inline(always)] - pub fn new(coin: ZCoin) -> Self { Self { coin } } +impl<'a> DeriveStreamerId<'a> for ZCoinBalanceEventStreamer { + type InitParam = ZCoin; + type DeriveParam = &'a str; + + fn new(coin: Self::InitParam) -> Self { Self { coin } } #[inline(always)] - pub fn derive_streamer_id(coin: &str) -> StreamerId { StreamerId::Balance { coin: coin.to_string() } } + fn derive_streamer_id(coin: Self::DeriveParam) -> StreamerId { StreamerId::Balance { coin: coin.to_string() } } } #[async_trait] diff --git a/mm2src/mm2_event_stream/src/lib.rs b/mm2src/mm2_event_stream/src/lib.rs index 25a5758fff..55454798b2 100644 --- a/mm2src/mm2_event_stream/src/lib.rs +++ b/mm2src/mm2_event_stream/src/lib.rs @@ -8,5 +8,5 @@ pub mod streamer_ids; pub use configuration::EventStreamingConfiguration; pub use event::Event; pub use manager::{StreamingManager, StreamingManagerError}; -pub use streamer::{Broadcaster, EventStreamer, NoDataIn, StreamHandlerInput}; +pub use streamer::{Broadcaster, DeriveStreamerId, EventStreamer, NoDataIn, StreamHandlerInput}; pub use streamer_ids::StreamerId; diff --git a/mm2src/mm2_event_stream/src/streamer.rs b/mm2src/mm2_event_stream/src/streamer.rs index dee79af8d1..19066f3822 100644 --- a/mm2src/mm2_event_stream/src/streamer.rs +++ b/mm2src/mm2_event_stream/src/streamer.rs @@ -39,6 +39,14 @@ where ); } +pub trait DeriveStreamerId<'a> { + type InitParam; + type DeriveParam: 'a; + + fn new(param: Self::InitParam) -> Self; + fn derive_streamer_id(param: Self::DeriveParam) -> StreamerId; +} + /// Spawns the [`EventStreamer::handle`] in a separate task using [`WeakSpawner`]. /// /// Returns a [`oneshot::Sender`] to shutdown the handler and an optional [`mpsc::UnboundedSender`] diff --git a/mm2src/mm2_main/src/lp_ordermatch.rs b/mm2src/mm2_main/src/lp_ordermatch.rs index 9f57101d42..4f1af03605 100644 --- a/mm2src/mm2_main/src/lp_ordermatch.rs +++ b/mm2src/mm2_main/src/lp_ordermatch.rs @@ -40,7 +40,7 @@ use http::Response; use keys::{AddressFormat, KeyPair}; use mm2_core::mm_ctx::{from_ctx, MmArc, MmWeak}; use mm2_err_handle::prelude::*; -use mm2_event_stream::StreamingManager; +use mm2_event_stream::{DeriveStreamerId, StreamingManager}; use mm2_libp2p::application::request_response::ordermatch::OrdermatchRequest; use mm2_libp2p::application::request_response::P2PRequest; use mm2_libp2p::{decode_signed, encode_and_sign, encode_message, pub_sub_topic, PublicKey, TopicHash, TopicPrefix, @@ -2737,9 +2737,10 @@ impl Orderbook { .insert(order.uuid); self.streaming_manager - .send_fn(&OrderbookStreamer::derive_streamer_id(&order.base, &order.rel), || { - OrderbookItemChangeEvent::NewOrUpdatedItem(Box::new(order.clone().into())) - }) + .send_fn( + &OrderbookStreamer::derive_streamer_id((&order.base, &order.rel)), + || OrderbookItemChangeEvent::NewOrUpdatedItem(Box::new(order.clone().into())), + ) .ok(); self.order_set.insert(order.uuid, order); } @@ -2806,9 +2807,10 @@ impl Orderbook { } self.streaming_manager - .send_fn(&OrderbookStreamer::derive_streamer_id(&order.base, &order.rel), || { - OrderbookItemChangeEvent::RemovedItem(order.uuid) - }) + .send_fn( + &OrderbookStreamer::derive_streamer_id((&order.base, &order.rel)), + || OrderbookItemChangeEvent::RemovedItem(order.uuid), + ) .ok(); Some(order) } @@ -3908,7 +3910,7 @@ async fn process_maker_reserved(ctx: MmArc, from_pubkey: H256Json, reserved_msg: }; ctx.event_stream_manager - .send_fn(&OrderStatusStreamer::derive_streamer_id(), || { + .send_fn(&OrderStatusStreamer::derive_streamer_id(()), || { OrderStatusEvent::TakerMatch(taker_match.clone()) }) .ok(); @@ -3963,7 +3965,7 @@ async fn process_maker_connected(ctx: MmArc, from_pubkey: PublicKey, connected: } ctx.event_stream_manager - .send_fn(&OrderStatusStreamer::derive_streamer_id(), || { + .send_fn(&OrderStatusStreamer::derive_streamer_id(()), || { OrderStatusEvent::TakerConnected(order_match.clone()) }) .ok(); @@ -4079,7 +4081,7 @@ async fn process_taker_request(ctx: MmArc, from_pubkey: H256Json, taker_request: }; ctx.event_stream_manager - .send_fn(&OrderStatusStreamer::derive_streamer_id(), || { + .send_fn(&OrderStatusStreamer::derive_streamer_id(()), || { OrderStatusEvent::MakerMatch(maker_match.clone()) }) .ok(); @@ -4150,7 +4152,7 @@ async fn process_taker_connect(ctx: MmArc, sender_pubkey: PublicKey, connect_msg let order_match = order_match.clone(); ctx.event_stream_manager - .send_fn(&OrderStatusStreamer::derive_streamer_id(), || { + .send_fn(&OrderStatusStreamer::derive_streamer_id(()), || { OrderStatusEvent::MakerConnected(order_match.clone()) }) .ok(); diff --git a/mm2src/mm2_main/src/lp_ordermatch/order_events.rs b/mm2src/mm2_main/src/lp_ordermatch/order_events.rs index 9e0fa97593..39d9a1deb0 100644 --- a/mm2src/mm2_main/src/lp_ordermatch/order_events.rs +++ b/mm2src/mm2_main/src/lp_ordermatch/order_events.rs @@ -1,5 +1,5 @@ use super::{MakerMatch, TakerMatch}; -use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput, StreamerId}; +use mm2_event_stream::{Broadcaster, DeriveStreamerId, Event, EventStreamer, StreamHandlerInput, StreamerId}; use async_trait::async_trait; use futures::channel::oneshot; @@ -7,12 +7,13 @@ use futures::StreamExt; pub struct OrderStatusStreamer; -impl OrderStatusStreamer { - #[inline(always)] - pub fn new() -> Self { Self } +impl DeriveStreamerId<'_> for OrderStatusStreamer { + type InitParam = (); + type DeriveParam = (); - #[inline(always)] - pub const fn derive_streamer_id() -> StreamerId { StreamerId::OrderStatus } + fn new(_: Self::InitParam) -> Self { Self } + + fn derive_streamer_id(_: Self::DeriveParam) -> StreamerId { StreamerId::OrderStatus } } #[derive(Serialize)] @@ -28,7 +29,7 @@ pub enum OrderStatusEvent { impl EventStreamer for OrderStatusStreamer { type DataInType = OrderStatusEvent; - fn streamer_id(&self) -> StreamerId { Self::derive_streamer_id() } + fn streamer_id(&self) -> StreamerId { Self::derive_streamer_id(()) } async fn handle( self, diff --git a/mm2src/mm2_main/src/lp_ordermatch/orderbook_events.rs b/mm2src/mm2_main/src/lp_ordermatch/orderbook_events.rs index 852da7fb1f..dec9a8bb6f 100644 --- a/mm2src/mm2_main/src/lp_ordermatch/orderbook_events.rs +++ b/mm2src/mm2_main/src/lp_ordermatch/orderbook_events.rs @@ -1,7 +1,7 @@ use super::{orderbook_topic_from_base_rel, subscribe_to_orderbook_topic, OrderbookP2PItem}; use coins::{is_wallet_only_ticker, lp_coinfind}; use mm2_core::mm_ctx::MmArc; -use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput, StreamerId}; +use mm2_event_stream::{Broadcaster, DeriveStreamerId, Event, EventStreamer, StreamHandlerInput, StreamerId}; use async_trait::async_trait; use futures::channel::oneshot; @@ -14,10 +14,13 @@ pub struct OrderbookStreamer { rel: String, } -impl OrderbookStreamer { - pub fn new(ctx: MmArc, base: String, rel: String) -> Self { Self { ctx, base, rel } } +impl<'a> DeriveStreamerId<'a> for OrderbookStreamer { + type InitParam = (MmArc, String, String); + type DeriveParam = (&'a str, &'a str); - pub fn derive_streamer_id(base: &str, rel: &str) -> StreamerId { + fn new((ctx, base, rel): Self::InitParam) -> Self { Self { ctx, base, rel } } + + fn derive_streamer_id((base, rel): Self::DeriveParam) -> StreamerId { StreamerId::OrderbookUpdate { topic: orderbook_topic_from_base_rel(base, rel), } @@ -38,7 +41,7 @@ pub enum OrderbookItemChangeEvent { impl EventStreamer for OrderbookStreamer { type DataInType = OrderbookItemChangeEvent; - fn streamer_id(&self) -> StreamerId { Self::derive_streamer_id(&self.base, &self.rel) } + fn streamer_id(&self) -> StreamerId { Self::derive_streamer_id((&self.base, &self.rel)) } async fn handle( self, diff --git a/mm2src/mm2_main/src/lp_swap/maker_swap.rs b/mm2src/mm2_main/src/lp_swap/maker_swap.rs index b74953f42b..af6b633441 100644 --- a/mm2src/mm2_main/src/lp_swap/maker_swap.rs +++ b/mm2src/mm2_main/src/lp_swap/maker_swap.rs @@ -31,6 +31,7 @@ use futures::{compat::Future01CompatExt, select, FutureExt}; use keys::KeyPair; use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::*; +use mm2_event_stream::DeriveStreamerId; use mm2_number::{BigDecimal, MmNumber}; use mm2_rpc::data::legacy::OrderConfirmationsSettings; use parking_lot::Mutex as PaMutex; @@ -2190,9 +2191,11 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) { drop(dispatcher); // Send a notification to the swap status streamer about a new event. ctx.event_stream_manager - .send_fn(&SwapStatusStreamer::derive_streamer_id(), || SwapStatusEvent::MakerV1 { - uuid: running_swap.uuid, - event: to_save.clone(), + .send_fn(&SwapStatusStreamer::derive_streamer_id(()), || { + SwapStatusEvent::MakerV1 { + uuid: running_swap.uuid, + event: to_save.clone(), + } }) .ok(); save_my_maker_swap_event(&ctx, &running_swap, to_save) diff --git a/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs b/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs index 3783e43950..b4feca6a3e 100644 --- a/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs +++ b/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs @@ -23,6 +23,7 @@ use crypto::secret_hash_algo::SecretHashAlgo; use keys::KeyPair; use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::*; +use mm2_event_stream::DeriveStreamerId; use mm2_libp2p::Secp256k1PubkeySerialize; use mm2_number::MmNumber; use mm2_state_machine::prelude::*; @@ -792,9 +793,11 @@ impl Self { Self } +impl DeriveStreamerId<'_> for SwapStatusStreamer { + type InitParam = (); + type DeriveParam = (); - #[inline(always)] - pub const fn derive_streamer_id() -> StreamerId { StreamerId::SwapStatus } + fn new(_: Self::InitParam) -> Self { Self } + + fn derive_streamer_id(_: Self::DeriveParam) -> StreamerId { StreamerId::SwapStatus } } #[derive(Serialize)] @@ -32,7 +33,7 @@ pub enum SwapStatusEvent { impl EventStreamer for SwapStatusStreamer { type DataInType = SwapStatusEvent; - fn streamer_id(&self) -> StreamerId { Self::derive_streamer_id() } + fn streamer_id(&self) -> StreamerId { Self::derive_streamer_id(()) } async fn handle( self, diff --git a/mm2src/mm2_main/src/lp_swap/taker_swap.rs b/mm2src/mm2_main/src/lp_swap/taker_swap.rs index 1fdb9e728f..acb5be30a2 100644 --- a/mm2src/mm2_main/src/lp_swap/taker_swap.rs +++ b/mm2src/mm2_main/src/lp_swap/taker_swap.rs @@ -31,6 +31,7 @@ use http::Response; use keys::KeyPair; use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::*; +use mm2_event_stream::DeriveStreamerId; use mm2_number::{BigDecimal, MmNumber}; use mm2_rpc::data::legacy::{MatchBy, OrderConfirmationsSettings, TakerAction}; use parking_lot::Mutex as PaMutex; @@ -491,9 +492,11 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) { // Send a notification to the swap status streamer about a new event. ctx.event_stream_manager - .send_fn(&SwapStatusStreamer::derive_streamer_id(), || SwapStatusEvent::TakerV1 { - uuid: running_swap.uuid, - event: to_save.clone(), + .send_fn(&SwapStatusStreamer::derive_streamer_id(()), || { + SwapStatusEvent::TakerV1 { + uuid: running_swap.uuid, + event: to_save.clone(), + } }) .ok(); save_my_taker_swap_event(&ctx, &running_swap, to_save) diff --git a/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs b/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs index 7f8270f139..856bf25af9 100644 --- a/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs +++ b/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs @@ -23,6 +23,7 @@ use crypto::secret_hash_algo::SecretHashAlgo; use keys::KeyPair; use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::*; +use mm2_event_stream::DeriveStreamerId; use mm2_libp2p::Secp256k1PubkeySerialize; use mm2_number::MmNumber; use mm2_state_machine::prelude::*; @@ -909,9 +910,11 @@ impl MmResult { - let order_status_streamer = OrderbookStreamer::new(ctx.clone(), req.base, req.rel); + let order_status_streamer = OrderbookStreamer::new((ctx.clone(), req.base, req.rel)); ctx.event_stream_manager .add(req.client_id, order_status_streamer, ctx.spawner()) .await diff --git a/mm2src/mm2_main/src/rpc/streaming_activations/orders.rs b/mm2src/mm2_main/src/rpc/streaming_activations/orders.rs index 08fd0a959a..2643921daf 100644 --- a/mm2src/mm2_main/src/rpc/streaming_activations/orders.rs +++ b/mm2src/mm2_main/src/rpc/streaming_activations/orders.rs @@ -3,6 +3,7 @@ use super::{EnableStreamingRequest, EnableStreamingResponse}; use crate::lp_ordermatch::order_events::OrderStatusStreamer; use mm2_core::mm_ctx::MmArc; use mm2_err_handle::{map_to_mm::MapToMmResult, mm_error::MmResult}; +use mm2_event_stream::DeriveStreamerId; use common::HttpStatusCode; use http::StatusCode; @@ -21,7 +22,7 @@ pub async fn enable_order_status( ctx: MmArc, req: EnableStreamingRequest<()>, ) -> MmResult { - let order_status_streamer = OrderStatusStreamer::new(); + let order_status_streamer = OrderStatusStreamer::new(()); ctx.event_stream_manager .add(req.client_id, order_status_streamer, ctx.spawner()) .await diff --git a/mm2src/mm2_main/src/rpc/streaming_activations/swaps.rs b/mm2src/mm2_main/src/rpc/streaming_activations/swaps.rs index 3d4aa2b93e..643db7b3c6 100644 --- a/mm2src/mm2_main/src/rpc/streaming_activations/swaps.rs +++ b/mm2src/mm2_main/src/rpc/streaming_activations/swaps.rs @@ -3,6 +3,7 @@ use super::{EnableStreamingRequest, EnableStreamingResponse}; use crate::lp_swap::swap_events::SwapStatusStreamer; use mm2_core::mm_ctx::MmArc; use mm2_err_handle::{map_to_mm::MapToMmResult, mm_error::MmResult}; +use mm2_event_stream::DeriveStreamerId; use common::HttpStatusCode; use http::StatusCode; @@ -26,7 +27,7 @@ pub async fn enable_swap_status( req: EnableStreamingRequest<()>, ) -> MmResult { ctx.event_stream_manager - .add(req.client_id, SwapStatusStreamer::new(), ctx.spawner()) + .add(req.client_id, SwapStatusStreamer::new(()), ctx.spawner()) .await .map(EnableStreamingResponse::new) .map_to_mm(|e| SwapStatusStreamingRequestError::EnableError(format!("{e:?}"))) diff --git a/mm2src/mm2_main/src/rpc/streaming_activations/tx_history.rs b/mm2src/mm2_main/src/rpc/streaming_activations/tx_history.rs index ac37ca21b5..1a4cb8fabd 100644 --- a/mm2src/mm2_main/src/rpc/streaming_activations/tx_history.rs +++ b/mm2src/mm2_main/src/rpc/streaming_activations/tx_history.rs @@ -8,6 +8,7 @@ use common::HttpStatusCode; use http::StatusCode; use mm2_core::mm_ctx::MmArc; use mm2_err_handle::{map_to_mm::MapToMmResult, mm_error::MmResult}; +use mm2_event_stream::DeriveStreamerId; #[derive(Deserialize)] pub struct EnableTxHistoryStreamingRequest { From 4b65bf6d6b1c68a5b0cad84d719745f1a19cb73c Mon Sep 17 00:00:00 2001 From: BigFish2086 Date: Wed, 16 Jul 2025 13:22:03 +0300 Subject: [PATCH 2/3] refactor(orderbook_events): introduce BaseAndRel type alias for DeriveParam #2441 Replaces the raw tuple (&str, &str) with a named type alias for clarity --- mm2src/mm2_main/src/lp_ordermatch/orderbook_events.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mm2src/mm2_main/src/lp_ordermatch/orderbook_events.rs b/mm2src/mm2_main/src/lp_ordermatch/orderbook_events.rs index dec9a8bb6f..4e61496a96 100644 --- a/mm2src/mm2_main/src/lp_ordermatch/orderbook_events.rs +++ b/mm2src/mm2_main/src/lp_ordermatch/orderbook_events.rs @@ -14,9 +14,10 @@ pub struct OrderbookStreamer { rel: String, } +type BaseAndRel<'a> = (&'a str, &'a str); impl<'a> DeriveStreamerId<'a> for OrderbookStreamer { type InitParam = (MmArc, String, String); - type DeriveParam = (&'a str, &'a str); + type DeriveParam = BaseAndRel<'a>; fn new((ctx, base, rel): Self::InitParam) -> Self { Self { ctx, base, rel } } From 14dc322dcfc0f822407269f71d0e10461ee87eae Mon Sep 17 00:00:00 2001 From: BigFish2086 Date: Wed, 16 Jul 2025 13:22:25 +0300 Subject: [PATCH 3/3] docs(event-streaming): add documentation for DeriveStreamerId trait and its associated types #2441 --- mm2src/mm2_event_stream/src/streamer.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/mm2src/mm2_event_stream/src/streamer.rs b/mm2src/mm2_event_stream/src/streamer.rs index 19066f3822..e6e5319a17 100644 --- a/mm2src/mm2_event_stream/src/streamer.rs +++ b/mm2src/mm2_event_stream/src/streamer.rs @@ -39,11 +39,22 @@ where ); } +/// Trait for types that can produce a unique [`StreamerId`] for event streaming. +/// +/// Used to standardize initialization and ID derivation for various streamers. +/// +/// - `'a`: Lifetime for borrowed derive parameters. pub trait DeriveStreamerId<'a> { + /// Type used to create the instance. type InitParam; + + /// Borrowed type used to derive the [`StreamerId`]. type DeriveParam: 'a; + /// Creates a new instance using the specified initialization parameter. fn new(param: Self::InitParam) -> Self; + + /// Derives a unique [`StreamerId`] based on the provided parameter. fn derive_streamer_id(param: Self::DeriveParam) -> StreamerId; }