diff --git a/mm2src/coins/eth/eth_balance_events.rs b/mm2src/coins/eth/eth_balance_events.rs index 0cb7afe134..18ab375d9d 100644 --- a/mm2src/coins/eth/eth_balance_events.rs +++ b/mm2src/coins/eth/eth_balance_events.rs @@ -3,7 +3,7 @@ use crate::{eth::{u256_to_big_decimal, Erc20TokenDetails}, BalanceError, CoinWithDerivationMethod}; use common::{executor::Timer, log, Future01CompatExt}; use mm2_err_handle::prelude::MmError; -use mm2_event_stream::{Broadcaster, Event, EventStreamer, NoDataIn, StreamHandlerInput}; +use mm2_event_stream::{Broadcaster, Event, EventStreamer, NoDataIn, StreamHandlerInput, StreamerId}; use mm2_number::BigDecimal; use async_trait::async_trait; @@ -146,7 +146,11 @@ async fn fetch_balance( impl EventStreamer for EthBalanceEventStreamer { type DataInType = NoDataIn; - fn streamer_id(&self) -> String { format!("BALANCE:{}", self.coin.ticker) } + fn streamer_id(&self) -> StreamerId { + StreamerId::Balance { + coin: self.coin.ticker.to_string(), + } + } async fn handle( self, @@ -154,7 +158,7 @@ impl EventStreamer for EthBalanceEventStreamer { ready_tx: oneshot::Sender>, _: impl StreamHandlerInput, ) { - async fn start_polling(streamer_id: String, broadcaster: Broadcaster, coin: EthCoin, interval: f64) { + async fn start_polling(streamer_id: StreamerId, broadcaster: Broadcaster, coin: EthCoin, interval: f64) { async fn sleep_remaining_time(interval: f64, now: Instant) { // If the interval is x seconds, // our goal is to broadcast changed balances every x seconds. diff --git a/mm2src/coins/eth/fee_estimation/eth_fee_events.rs b/mm2src/coins/eth/fee_estimation/eth_fee_events.rs index 0af1f13579..1c79e7da40 100644 --- a/mm2src/coins/eth/fee_estimation/eth_fee_events.rs +++ b/mm2src/coins/eth/fee_estimation/eth_fee_events.rs @@ -1,7 +1,7 @@ use super::ser::FeePerGasEstimated; use crate::eth::EthCoin; use common::executor::Timer; -use mm2_event_stream::{Broadcaster, Event, EventStreamer, NoDataIn, StreamHandlerInput}; +use mm2_event_stream::{Broadcaster, Event, EventStreamer, NoDataIn, StreamHandlerInput, StreamerId}; use async_trait::async_trait; use compatible_time::Instant; @@ -52,7 +52,11 @@ impl EthFeeEventStreamer { impl EventStreamer for EthFeeEventStreamer { type DataInType = NoDataIn; - fn streamer_id(&self) -> String { format!("FEE_ESTIMATION:{}", self.coin.ticker) } + fn streamer_id(&self) -> StreamerId { + StreamerId::FeeEstimation { + coin: self.coin.ticker.to_string(), + } + } async fn handle( self, diff --git a/mm2src/coins/tendermint/tendermint_balance_events.rs b/mm2src/coins/tendermint/tendermint_balance_events.rs index eed451b5dd..3304e35e97 100644 --- a/mm2src/coins/tendermint/tendermint_balance_events.rs +++ b/mm2src/coins/tendermint/tendermint_balance_events.rs @@ -3,7 +3,7 @@ use common::{http_uri_to_ws_address, log, PROXY_REQUEST_EXPIRATION_SEC}; use futures::channel::oneshot; use futures_util::{SinkExt, StreamExt}; use jsonrpc_core::{Id as RpcId, Params as RpcParams, Value as RpcValue, Version as RpcVersion}; -use mm2_event_stream::{Broadcaster, Event, EventStreamer, NoDataIn, StreamHandlerInput}; +use mm2_event_stream::{Broadcaster, Event, EventStreamer, NoDataIn, StreamHandlerInput, StreamerId}; use mm2_number::BigDecimal; use proxy_signature::RawMessage; use std::collections::{HashMap, HashSet}; @@ -23,7 +23,11 @@ impl TendermintBalanceEventStreamer { impl EventStreamer for TendermintBalanceEventStreamer { type DataInType = NoDataIn; - fn streamer_id(&self) -> String { format!("BALANCE:{}", self.coin.ticker()) } + fn streamer_id(&self) -> StreamerId { + StreamerId::Balance { + coin: self.coin.ticker().to_string(), + } + } async fn handle( self, diff --git a/mm2src/coins/utxo/tx_history_events.rs b/mm2src/coins/utxo/tx_history_events.rs index c336e6fbb0..e0404228a9 100644 --- a/mm2src/coins/utxo/tx_history_events.rs +++ b/mm2src/coins/utxo/tx_history_events.rs @@ -1,5 +1,5 @@ use crate::TransactionDetails; -use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput}; +use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput, StreamerId}; use async_trait::async_trait; use futures::channel::oneshot; @@ -14,14 +14,14 @@ impl TxHistoryEventStreamer { pub fn new(coin: String) -> Self { Self { coin } } #[inline(always)] - pub fn derive_streamer_id(coin: &str) -> String { format!("TX_HISTORY:{coin}") } + pub fn derive_streamer_id(coin: &str) -> StreamerId { StreamerId::TxHistory { coin: coin.to_string() } } } #[async_trait] impl EventStreamer for TxHistoryEventStreamer { type DataInType = Vec; - fn streamer_id(&self) -> String { Self::derive_streamer_id(&self.coin) } + fn streamer_id(&self) -> StreamerId { Self::derive_streamer_id(&self.coin) } async fn handle( self, diff --git a/mm2src/coins/utxo/utxo_balance_events.rs b/mm2src/coins/utxo/utxo_balance_events.rs index 8fdde86ab7..9451675cdb 100644 --- a/mm2src/coins/utxo/utxo_balance_events.rs +++ b/mm2src/coins/utxo/utxo_balance_events.rs @@ -12,7 +12,7 @@ use common::log; use futures::channel::oneshot; use futures::StreamExt; use keys::Address; -use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput}; +use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput, StreamerId}; use std::collections::{HashMap, HashSet}; macro_rules! try_or_continue { @@ -40,14 +40,18 @@ impl UtxoBalanceEventStreamer { } } - pub fn derive_streamer_id(coin: &str) -> String { format!("BALANCE:{coin}") } + pub fn derive_streamer_id(coin: &str) -> StreamerId { StreamerId::Balance { coin: coin.to_string() } } } #[async_trait] impl EventStreamer for UtxoBalanceEventStreamer { type DataInType = ScripthashNotification; - fn streamer_id(&self) -> String { format!("BALANCE:{}", self.coin.ticker()) } + fn streamer_id(&self) -> StreamerId { + StreamerId::Balance { + coin: self.coin.ticker().to_string(), + } + } async fn handle( self, diff --git a/mm2src/coins/z_coin/tx_history_events.rs b/mm2src/coins/z_coin/tx_history_events.rs index f374bc22b1..c09da5d732 100644 --- a/mm2src/coins/z_coin/tx_history_events.rs +++ b/mm2src/coins/z_coin/tx_history_events.rs @@ -4,7 +4,7 @@ use crate::utxo::rpc_clients::UtxoRpcError; use crate::MarketCoinOps; use common::log; use mm2_err_handle::prelude::MmError; -use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput}; +use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput, StreamerId}; use rpc::v1::types::H256 as H256Json; use async_trait::async_trait; @@ -23,14 +23,14 @@ impl ZCoinTxHistoryEventStreamer { pub fn new(coin: ZCoin) -> Self { Self { coin } } #[inline(always)] - pub fn derive_streamer_id(coin: &str) -> String { format!("TX_HISTORY:{coin}") } + pub fn derive_streamer_id(coin: &str) -> StreamerId { StreamerId::TxHistory { coin: coin.to_string() } } } #[async_trait] impl EventStreamer for ZCoinTxHistoryEventStreamer { type DataInType = Vec>; - fn streamer_id(&self) -> String { Self::derive_streamer_id(self.coin.ticker()) } + fn streamer_id(&self) -> StreamerId { Self::derive_streamer_id(self.coin.ticker()) } async fn handle( self, diff --git a/mm2src/coins/z_coin/tx_streaming_tests/native.rs b/mm2src/coins/z_coin/tx_streaming_tests/native.rs index f4bc2849dc..6cb7f9a97c 100644 --- a/mm2src/coins/z_coin/tx_streaming_tests/native.rs +++ b/mm2src/coins/z_coin/tx_streaming_tests/native.rs @@ -60,13 +60,13 @@ fn test_zcoin_tx_streaming() { .expect("tx history sender shutdown"); log!("{:?}", event.get()); - let (event_type, event_data) = event.get(); + let (streamer_id, event_data) = event.get(); // Make sure this is not an error event, - assert!(!event_type.starts_with("ERROR_")); + assert!(!streamer_id.starts_with("ERROR:")); // from the expected streamer, assert_eq!( - event_type, - ZCoinTxHistoryEventStreamer::derive_streamer_id(coin.ticker()) + streamer_id, + ZCoinTxHistoryEventStreamer::derive_streamer_id(coin.ticker()).to_string() ); // and has the expected data. assert_eq!(event_data["tx_hash"].as_str().unwrap(), tx.txid().to_string()); diff --git a/mm2src/coins/z_coin/z_balance_streaming.rs b/mm2src/coins/z_coin/z_balance_streaming.rs index 0760bfc929..c5b012fb3b 100644 --- a/mm2src/coins/z_coin/z_balance_streaming.rs +++ b/mm2src/coins/z_coin/z_balance_streaming.rs @@ -6,7 +6,7 @@ use async_trait::async_trait; use common::log::error; use futures::channel::oneshot; use futures_util::StreamExt; -use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput}; +use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput, StreamerId}; pub struct ZCoinBalanceEventStreamer { coin: ZCoin, @@ -17,14 +17,18 @@ impl ZCoinBalanceEventStreamer { pub fn new(coin: ZCoin) -> Self { Self { coin } } #[inline(always)] - pub fn derive_streamer_id(coin: &str) -> String { format!("BALANCE:{coin}") } + pub fn derive_streamer_id(coin: &str) -> StreamerId { StreamerId::Balance { coin: coin.to_string() } } } #[async_trait] impl EventStreamer for ZCoinBalanceEventStreamer { type DataInType = (); - fn streamer_id(&self) -> String { Self::derive_streamer_id(self.coin.ticker()) } + fn streamer_id(&self) -> StreamerId { + StreamerId::Balance { + coin: self.coin.ticker().to_string(), + } + } async fn handle( self, diff --git a/mm2src/mm2_core/src/data_asker.rs b/mm2src/mm2_core/src/data_asker.rs index 2e9a125d56..eba0158b24 100644 --- a/mm2src/mm2_core/src/data_asker.rs +++ b/mm2src/mm2_core/src/data_asker.rs @@ -5,7 +5,7 @@ use derive_more::Display; use futures::channel::oneshot; use futures::lock::Mutex as AsyncMutex; use mm2_err_handle::prelude::*; -use mm2_event_stream::Event; +use mm2_event_stream::{Event, StreamerId}; use ser_error_derive::SerializeErrorType; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; @@ -16,8 +16,6 @@ use timed_map::{MapKind, TimedMap}; use crate::mm_ctx::{MmArc, MmCtx}; -const EVENT_NAME: &str = "DATA_NEEDED"; - #[derive(Clone, Debug)] pub struct DataAsker { data_id: Arc, @@ -81,8 +79,12 @@ impl MmCtx { "data": data }); - self.event_stream_manager - .broadcast_all(Event::new(format!("{EVENT_NAME}:{data_type}"), input)); + self.event_stream_manager.broadcast_all(Event::new( + StreamerId::DataNeeded { + data_type: data_type.to_string(), + }, + input, + )); match receiver.timeout(timeout).await { Ok(Ok(response)) => match serde_json::from_value::(response) { diff --git a/mm2src/mm2_event_stream/src/event.rs b/mm2src/mm2_event_stream/src/event.rs index 306bbc9e49..d652b75f09 100644 --- a/mm2src/mm2_event_stream/src/event.rs +++ b/mm2src/mm2_event_stream/src/event.rs @@ -1,13 +1,13 @@ +use crate::StreamerId; use serde_json::Value as Json; // Note `Event` shouldn't be `Clone`able, but rather Arc/Rc wrapped and then shared. // This is only for testing. /// Multi-purpose/generic event type that can easily be used over the event streaming #[cfg_attr(any(test, target_arch = "wasm32"), derive(Clone, Debug, PartialEq))] -#[derive(Default)] pub struct Event { /// The type of the event (balance, network, swap, etc...). - event_type: String, + streamer_id: StreamerId, /// The message to be sent to the client. message: Json, /// Indicating whether this event is an error event or a normal one. @@ -17,9 +17,9 @@ pub struct Event { impl Event { /// Creates a new `Event` instance with the specified event type and message. #[inline(always)] - pub fn new(streamer_id: String, message: Json) -> Self { + pub fn new(streamer_id: StreamerId, message: Json) -> Self { Self { - event_type: streamer_id, + streamer_id, message, error: false, } @@ -27,21 +27,25 @@ impl Event { /// Create a new error `Event` instance with the specified error event type and message. #[inline(always)] - pub fn err(streamer_id: String, message: Json) -> Self { + pub fn err(streamer_id: StreamerId, message: Json) -> Self { Self { - event_type: streamer_id, + streamer_id, message, error: true, } } - /// Returns the `event_type` (the ID of the streamer firing this event). + /// Returns whether this event is an error or not #[inline(always)] - pub fn origin(&self) -> &str { &self.event_type } + pub fn is_error(&self) -> bool { self.error } + + /// Returns the `streamer_id` (the ID of the streamer firing this event). + #[inline(always)] + pub fn origin(&self) -> &StreamerId { &self.streamer_id } /// Returns the event type and message as a pair. pub fn get(&self) -> (String, &Json) { let prefix = if self.error { "ERROR:" } else { "" }; - (format!("{prefix}{}", self.event_type), &self.message) + (format!("{prefix}{}", self.streamer_id), &self.message) } } diff --git a/mm2src/mm2_event_stream/src/lib.rs b/mm2src/mm2_event_stream/src/lib.rs index db4587a77a..25a5758fff 100644 --- a/mm2src/mm2_event_stream/src/lib.rs +++ b/mm2src/mm2_event_stream/src/lib.rs @@ -2,9 +2,11 @@ pub mod configuration; pub mod event; pub mod manager; pub mod streamer; +pub mod streamer_ids; // Re-export important types. pub use configuration::EventStreamingConfiguration; pub use event::Event; pub use manager::{StreamingManager, StreamingManagerError}; pub use streamer::{Broadcaster, EventStreamer, NoDataIn, StreamHandlerInput}; +pub use streamer_ids::StreamerId; diff --git a/mm2src/mm2_event_stream/src/manager.rs b/mm2src/mm2_event_stream/src/manager.rs index b480ddd070..1bef846ef5 100644 --- a/mm2src/mm2_event_stream/src/manager.rs +++ b/mm2src/mm2_event_stream/src/manager.rs @@ -4,7 +4,7 @@ use std::ops::{Deref, DerefMut}; use std::sync::Arc; use crate::streamer::spawn; -use crate::{Event, EventStreamer}; +use crate::{Event, EventStreamer, StreamerId}; use common::executor::abortable_queue::WeakSpawner; use common::log::{error, LogOnError}; @@ -62,7 +62,7 @@ impl StreamerInfo { #[derive(Debug)] struct ClientInfo { /// The streamers the client is listening to. - listening_to: HashSet, + listening_to: HashSet, /// The communication/stream-out channel to the client. // NOTE: Here we are using `tokio`'s `mpsc` because the one in `futures` have some extra feature // (ref: https://users.rust-lang.org/t/why-does-try-send-from-crate-futures-require-mut-self/100389). @@ -80,11 +80,11 @@ impl ClientInfo { } } - fn add_streamer(&mut self, streamer_id: String) { self.listening_to.insert(streamer_id); } + fn add_streamer(&mut self, streamer_id: StreamerId) { self.listening_to.insert(streamer_id); } - fn remove_streamer(&mut self, streamer_id: &str) { self.listening_to.remove(streamer_id); } + fn remove_streamer(&mut self, streamer_id: &StreamerId) { self.listening_to.remove(streamer_id); } - fn listens_to(&self, streamer_id: &str) -> bool { self.listening_to.contains(streamer_id) } + fn listens_to(&self, streamer_id: &StreamerId) -> bool { self.listening_to.contains(streamer_id) } fn send_event(&self, event: Arc) { // Only `try_send` here. If the channel is full (client is slow), the message @@ -97,7 +97,7 @@ impl ClientInfo { #[derive(Default, Debug)] struct StreamingManagerInner { /// A map from streamer IDs to their communication channels (if present) and shutdown handles. - streamers: HashMap, + streamers: HashMap, /// An inverse map from client IDs to the streamers they are listening to and the communication channel with the client. clients: HashMap, } @@ -118,7 +118,7 @@ impl StreamingManager { client_id: u64, streamer: impl EventStreamer, spawner: WeakSpawner, - ) -> Result { + ) -> Result { let streamer_id = streamer.streamer_id(); // Remove the streamer if it died for some reason. self.remove_streamer_if_down(&streamer_id); @@ -173,7 +173,7 @@ impl StreamingManager { } /// Sends data to a streamer with `streamer_id`. - pub fn send(&self, streamer_id: &str, data: T) -> Result<(), StreamingManagerError> { + pub fn send(&self, streamer_id: &StreamerId, data: T) -> Result<(), StreamingManagerError> { let this = self.read(); let streamer_info = this .streamers @@ -192,7 +192,7 @@ impl StreamingManager { /// `data_fn` will only be evaluated if the streamer is found and accepts an input. pub fn send_fn( &self, - streamer_id: &str, + streamer_id: &StreamerId, data_fn: impl FnOnce() -> T, ) -> Result<(), StreamingManagerError> { let this = self.read(); @@ -207,7 +207,7 @@ impl StreamingManager { } /// Stops streaming from the streamer with `streamer_id` to the client with `client_id`. - pub fn stop(&self, client_id: u64, streamer_id: &str) -> Result<(), StreamingManagerError> { + pub fn stop(&self, client_id: u64, streamer_id: &StreamerId) -> Result<(), StreamingManagerError> { let mut this = self.write(); let client_info = this .clients @@ -312,7 +312,7 @@ impl StreamingManager { /// Aside from us shutting down a streamer when all its clients are disconnected, /// the streamer might die by itself (e.g. the spawner it was spawned with aborted). /// In this case, we need to remove the streamer and de-list it from all clients. - fn remove_streamer_if_down(&self, streamer_id: &str) { + fn remove_streamer_if_down(&self, streamer_id: &StreamerId) { let mut this = self.write(); let Some(streamer_info) = this.streamers.get(streamer_id) else { return; @@ -400,7 +400,12 @@ mod tests { let manager = StreamingManager::default(); let mut client1 = manager.new_client(1).unwrap(); let mut client2 = manager.new_client(2).unwrap(); - let event = Event::new("test".to_string(), json!("test")); + let event = Event::new( + StreamerId::ForTesting { + test_streamer: "test".to_string(), + }, + json!("test"), + ); // Broadcast the event to all clients. manager.broadcast_all(event.clone()); @@ -440,7 +445,7 @@ mod tests { // The streamer should send an event every 0.1s. Wait for 0.15s for safety. Timer::sleep(0.15).await; let event = client1.try_recv().unwrap(); - assert_eq!(event.origin(), streamer_id); + assert_eq!(event.origin(), &streamer_id); } // The other client shouldn't have received any events. @@ -472,7 +477,7 @@ mod tests { Timer::sleep(0.1).await; // The streamer should broadcast some event to the subscribed clients. let event = client1.try_recv().unwrap(); - assert_eq!(event.origin(), streamer_id); + assert_eq!(event.origin(), &streamer_id); // It's an echo streamer, so the message should be the same. assert_eq!(event.get().1, &json!(msg)); } diff --git a/mm2src/mm2_event_stream/src/streamer.rs b/mm2src/mm2_event_stream/src/streamer.rs index 6c319cb89c..dee79af8d1 100644 --- a/mm2src/mm2_event_stream/src/streamer.rs +++ b/mm2src/mm2_event_stream/src/streamer.rs @@ -1,6 +1,6 @@ use std::any::{self, Any}; -use crate::{Event, StreamingManager}; +use crate::{Event, StreamerId, StreamingManager}; use common::executor::{abortable_queue::WeakSpawner, AbortSettings, SpawnAbortable}; use common::log::{error, info}; @@ -25,7 +25,7 @@ where /// Returns a human readable unique identifier for the event streamer. /// No other event streamer should have the same identifier. - fn streamer_id(&self) -> String; + fn streamer_id(&self) -> StreamerId; /// Event handler that is responsible for broadcasting event data to the streaming channels. /// @@ -129,7 +129,11 @@ pub mod test_utils { impl EventStreamer for PeriodicStreamer { type DataInType = NoDataIn; - fn streamer_id(&self) -> String { "periodic_streamer".to_string() } + fn streamer_id(&self) -> StreamerId { + StreamerId::ForTesting { + test_streamer: "periodic_streamer".to_string(), + } + } async fn handle( self, @@ -152,7 +156,11 @@ pub mod test_utils { impl EventStreamer for ReactiveStreamer { type DataInType = String; - fn streamer_id(&self) -> String { "reactive_streamer".to_string() } + fn streamer_id(&self) -> StreamerId { + StreamerId::ForTesting { + test_streamer: "reactive_streamer".to_string(), + } + } async fn handle( self, @@ -175,7 +183,11 @@ pub mod test_utils { impl EventStreamer for InitErrorStreamer { type DataInType = NoDataIn; - fn streamer_id(&self) -> String { "init_error_streamer".to_string() } + fn streamer_id(&self) -> StreamerId { + StreamerId::ForTesting { + test_streamer: "init_error_streamer".to_string(), + } + } async fn handle( self, diff --git a/mm2src/mm2_event_stream/src/streamer_ids.rs b/mm2src/mm2_event_stream/src/streamer_ids.rs new file mode 100644 index 0000000000..d019b9a9e0 --- /dev/null +++ b/mm2src/mm2_event_stream/src/streamer_ids.rs @@ -0,0 +1,129 @@ +use serde::de::{self, Visitor}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use std::fmt; + +const NETWORK: &str = "NETWORK"; +const HEARTBEAT: &str = "HEARTBEAT"; +const SWAP_STATUS: &str = "SWAP_STATUS"; +const ORDER_STATUS: &str = "ORDER_STATUS"; + +const TASK_PREFIX: &str = "TASK:"; +const BALANCE_PREFIX: &str = "BALANCE:"; +const TX_HISTORY_PREFIX: &str = "TX_HISTORY:"; +const FEE_ESTIMATION_PREFIX: &str = "FEE_ESTIMATION:"; +const DATA_NEEDED_PREFIX: &str = "DATA_NEEDED:"; +const ORDERBOOK_UPDATE_PREFIX: &str = "ORDERBOOK_UPDATE:"; +#[cfg(any(test, target_arch = "wasm32"))] +const FOR_TESTING_PREFIX: &str = "TEST_STREAMER:"; + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub enum StreamerId { + Network, + Heartbeat, + SwapStatus, + OrderStatus, + Task { + task_id: u64, // TODO: should be TaskId (from rpc_task) + }, + Balance { + coin: String, + }, + DataNeeded { + data_type: String, + }, + TxHistory { + coin: String, + }, + FeeEstimation { + coin: String, + }, + OrderbookUpdate { + topic: String, + }, + #[cfg(any(test, target_arch = "wasm32"))] + ForTesting { + test_streamer: String, + }, +} + +impl fmt::Display for StreamerId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + StreamerId::Network => write!(f, "{}", NETWORK), + StreamerId::Heartbeat => write!(f, "{}", HEARTBEAT), + StreamerId::SwapStatus => write!(f, "{}", SWAP_STATUS), + StreamerId::OrderStatus => write!(f, "{}", ORDER_STATUS), + StreamerId::Task { task_id } => write!(f, "{}{}", TASK_PREFIX, task_id), + StreamerId::Balance { coin } => write!(f, "{}{}", BALANCE_PREFIX, coin), + StreamerId::TxHistory { coin } => write!(f, "{}{}", TX_HISTORY_PREFIX, coin), + StreamerId::FeeEstimation { coin } => write!(f, "{}{}", FEE_ESTIMATION_PREFIX, coin), + StreamerId::DataNeeded { data_type } => write!(f, "{}{}", DATA_NEEDED_PREFIX, data_type), + StreamerId::OrderbookUpdate { topic } => write!(f, "{}{}", ORDERBOOK_UPDATE_PREFIX, topic), + #[cfg(any(test, target_arch = "wasm32"))] + StreamerId::ForTesting { test_streamer } => write!(f, "{}{}", FOR_TESTING_PREFIX, test_streamer), + } + } +} + +impl Serialize for StreamerId { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&self.to_string()) + } +} + +impl<'de> Deserialize<'de> for StreamerId { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct StreamerIdVisitor; + + impl<'de> Visitor<'de> for StreamerIdVisitor { + type Value = StreamerId; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a string representing a StreamerId") + } + + fn visit_str(self, value: &str) -> Result + where + E: de::Error, + { + match value { + NETWORK => Ok(StreamerId::Network), + HEARTBEAT => Ok(StreamerId::Heartbeat), + SWAP_STATUS => Ok(StreamerId::SwapStatus), + ORDER_STATUS => Ok(StreamerId::OrderStatus), + v if v.starts_with(TASK_PREFIX) => Ok(StreamerId::Task { + task_id: v[TASK_PREFIX.len()..].parse().map_err(de::Error::custom)?, + }), + v if v.starts_with(BALANCE_PREFIX) => Ok(StreamerId::Balance { + coin: v[BALANCE_PREFIX.len()..].to_string(), + }), + v if v.starts_with(TX_HISTORY_PREFIX) => Ok(StreamerId::TxHistory { + coin: v[TX_HISTORY_PREFIX.len()..].to_string(), + }), + v if v.starts_with(FEE_ESTIMATION_PREFIX) => Ok(StreamerId::FeeEstimation { + coin: v[FEE_ESTIMATION_PREFIX.len()..].to_string(), + }), + v if v.starts_with(DATA_NEEDED_PREFIX) => Ok(StreamerId::DataNeeded { + data_type: v[DATA_NEEDED_PREFIX.len()..].to_string(), + }), + v if v.starts_with(ORDERBOOK_UPDATE_PREFIX) => Ok(StreamerId::OrderbookUpdate { + topic: v[ORDERBOOK_UPDATE_PREFIX.len()..].to_string(), + }), + #[cfg(any(test, target_arch = "wasm32"))] + v if v.starts_with(FOR_TESTING_PREFIX) => Ok(StreamerId::ForTesting { + test_streamer: v[FOR_TESTING_PREFIX.len()..].to_string(), + }), + _ => Err(de::Error::custom(format!("Invalid StreamerId: {}", value))), + } + } + } + + deserializer.deserialize_str(StreamerIdVisitor) + } +} diff --git a/mm2src/mm2_main/src/heartbeat_event.rs b/mm2src/mm2_main/src/heartbeat_event.rs index a2c46f2fb6..6645f407fe 100644 --- a/mm2src/mm2_main/src/heartbeat_event.rs +++ b/mm2src/mm2_main/src/heartbeat_event.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use common::executor::Timer; use futures::channel::oneshot; -use mm2_event_stream::{Broadcaster, Event, EventStreamer, NoDataIn, StreamHandlerInput}; +use mm2_event_stream::{Broadcaster, Event, EventStreamer, NoDataIn, StreamHandlerInput, StreamerId}; use serde::Deserialize; #[derive(Deserialize)] @@ -31,7 +31,7 @@ impl HeartbeatEvent { impl EventStreamer for HeartbeatEvent { type DataInType = NoDataIn; - fn streamer_id(&self) -> String { "HEARTBEAT".to_string() } + fn streamer_id(&self) -> StreamerId { StreamerId::Heartbeat } async fn handle( self, diff --git a/mm2src/mm2_main/src/lp_ordermatch.rs b/mm2src/mm2_main/src/lp_ordermatch.rs index 18475ce191..0d1576a210 100644 --- a/mm2src/mm2_main/src/lp_ordermatch.rs +++ b/mm2src/mm2_main/src/lp_ordermatch.rs @@ -3835,7 +3835,7 @@ async fn process_maker_reserved(ctx: MmArc, from_pubkey: H256Json, reserved_msg: }; ctx.event_stream_manager - .send_fn(OrderStatusStreamer::derive_streamer_id(), || { + .send_fn(&OrderStatusStreamer::derive_streamer_id(), || { OrderStatusEvent::TakerMatch(taker_match.clone()) }) .ok(); @@ -3890,7 +3890,7 @@ async fn process_maker_connected(ctx: MmArc, from_pubkey: PublicKey, connected: } ctx.event_stream_manager - .send_fn(OrderStatusStreamer::derive_streamer_id(), || { + .send_fn(&OrderStatusStreamer::derive_streamer_id(), || { OrderStatusEvent::TakerConnected(order_match.clone()) }) .ok(); @@ -4004,7 +4004,7 @@ async fn process_taker_request(ctx: MmArc, from_pubkey: H256Json, taker_request: }; ctx.event_stream_manager - .send_fn(OrderStatusStreamer::derive_streamer_id(), || { + .send_fn(&OrderStatusStreamer::derive_streamer_id(), || { OrderStatusEvent::MakerMatch(maker_match.clone()) }) .ok(); @@ -4075,7 +4075,7 @@ async fn process_taker_connect(ctx: MmArc, sender_pubkey: PublicKey, connect_msg let order_match = order_match.clone(); ctx.event_stream_manager - .send_fn(OrderStatusStreamer::derive_streamer_id(), || { + .send_fn(&OrderStatusStreamer::derive_streamer_id(), || { OrderStatusEvent::MakerConnected(order_match.clone()) }) .ok(); diff --git a/mm2src/mm2_main/src/lp_ordermatch/order_events.rs b/mm2src/mm2_main/src/lp_ordermatch/order_events.rs index 547ee7df4e..9e0fa97593 100644 --- a/mm2src/mm2_main/src/lp_ordermatch/order_events.rs +++ b/mm2src/mm2_main/src/lp_ordermatch/order_events.rs @@ -1,5 +1,5 @@ use super::{MakerMatch, TakerMatch}; -use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput}; +use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput, StreamerId}; use async_trait::async_trait; use futures::channel::oneshot; @@ -12,7 +12,7 @@ impl OrderStatusStreamer { pub fn new() -> Self { Self } #[inline(always)] - pub const fn derive_streamer_id() -> &'static str { "ORDER_STATUS" } + pub const fn derive_streamer_id() -> StreamerId { StreamerId::OrderStatus } } #[derive(Serialize)] @@ -28,7 +28,7 @@ pub enum OrderStatusEvent { impl EventStreamer for OrderStatusStreamer { type DataInType = OrderStatusEvent; - fn streamer_id(&self) -> String { Self::derive_streamer_id().to_string() } + fn streamer_id(&self) -> StreamerId { Self::derive_streamer_id() } async fn handle( self, diff --git a/mm2src/mm2_main/src/lp_ordermatch/orderbook_events.rs b/mm2src/mm2_main/src/lp_ordermatch/orderbook_events.rs index f7149bd05e..852da7fb1f 100644 --- a/mm2src/mm2_main/src/lp_ordermatch/orderbook_events.rs +++ b/mm2src/mm2_main/src/lp_ordermatch/orderbook_events.rs @@ -1,7 +1,7 @@ use super::{orderbook_topic_from_base_rel, subscribe_to_orderbook_topic, OrderbookP2PItem}; use coins::{is_wallet_only_ticker, lp_coinfind}; use mm2_core::mm_ctx::MmArc; -use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput}; +use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput, StreamerId}; use async_trait::async_trait; use futures::channel::oneshot; @@ -17,8 +17,10 @@ pub struct OrderbookStreamer { impl OrderbookStreamer { pub fn new(ctx: MmArc, base: String, rel: String) -> Self { Self { ctx, base, rel } } - pub fn derive_streamer_id(base: &str, rel: &str) -> String { - format!("ORDERBOOK_UPDATE/{}", orderbook_topic_from_base_rel(base, rel)) + pub fn derive_streamer_id(base: &str, rel: &str) -> StreamerId { + StreamerId::OrderbookUpdate { + topic: orderbook_topic_from_base_rel(base, rel), + } } } @@ -36,7 +38,7 @@ pub enum OrderbookItemChangeEvent { impl EventStreamer for OrderbookStreamer { type DataInType = OrderbookItemChangeEvent; - fn streamer_id(&self) -> String { Self::derive_streamer_id(&self.base, &self.rel) } + fn streamer_id(&self) -> StreamerId { Self::derive_streamer_id(&self.base, &self.rel) } async fn handle( self, diff --git a/mm2src/mm2_main/src/lp_swap/maker_swap.rs b/mm2src/mm2_main/src/lp_swap/maker_swap.rs index c7e5a43329..397e25aff8 100644 --- a/mm2src/mm2_main/src/lp_swap/maker_swap.rs +++ b/mm2src/mm2_main/src/lp_swap/maker_swap.rs @@ -2182,7 +2182,7 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) { drop(dispatcher); // Send a notification to the swap status streamer about a new event. ctx.event_stream_manager - .send_fn(SwapStatusStreamer::derive_streamer_id(), || SwapStatusEvent::MakerV1 { + .send_fn(&SwapStatusStreamer::derive_streamer_id(), || SwapStatusEvent::MakerV1 { uuid: running_swap.uuid, event: to_save.clone(), }) diff --git a/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs b/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs index c4592fd494..3783e43950 100644 --- a/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs +++ b/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs @@ -792,7 +792,7 @@ impl Self { Self } #[inline(always)] - pub const fn derive_streamer_id() -> &'static str { "SWAP_STATUS" } + pub const fn derive_streamer_id() -> StreamerId { StreamerId::SwapStatus } } #[derive(Serialize)] @@ -32,7 +32,7 @@ pub enum SwapStatusEvent { impl EventStreamer for SwapStatusStreamer { type DataInType = SwapStatusEvent; - fn streamer_id(&self) -> String { Self::derive_streamer_id().to_string() } + fn streamer_id(&self) -> StreamerId { Self::derive_streamer_id() } async fn handle( self, diff --git a/mm2src/mm2_main/src/lp_swap/taker_swap.rs b/mm2src/mm2_main/src/lp_swap/taker_swap.rs index 54f5ab3bfe..4300c9b066 100644 --- a/mm2src/mm2_main/src/lp_swap/taker_swap.rs +++ b/mm2src/mm2_main/src/lp_swap/taker_swap.rs @@ -485,7 +485,7 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) { // Send a notification to the swap status streamer about a new event. ctx.event_stream_manager - .send_fn(SwapStatusStreamer::derive_streamer_id(), || SwapStatusEvent::TakerV1 { + .send_fn(&SwapStatusStreamer::derive_streamer_id(), || SwapStatusEvent::TakerV1 { uuid: running_swap.uuid, event: to_save.clone(), }) diff --git a/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs b/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs index eea040316d..7f8270f139 100644 --- a/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs +++ b/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs @@ -909,7 +909,7 @@ impl { /// The success/ok response for any event streaming activation request. #[derive(Serialize)] pub struct EnableStreamingResponse { - pub streamer_id: String, + pub streamer_id: StreamerId, // TODO: If the the streamer was already running, it is probably running with different configuration. // We might want to inform the client that the configuration they asked for wasn't applied and return // the active configuration instead? @@ -41,5 +43,5 @@ pub struct EnableStreamingResponse { } impl EnableStreamingResponse { - fn new(streamer_id: String) -> Self { Self { streamer_id } } + fn new(streamer_id: StreamerId) -> Self { Self { streamer_id } } } diff --git a/mm2src/mm2_p2p/src/application/network_event.rs b/mm2src/mm2_p2p/src/application/network_event.rs index fa152469d1..bdab12f1dc 100644 --- a/mm2src/mm2_p2p/src/application/network_event.rs +++ b/mm2src/mm2_p2p/src/application/network_event.rs @@ -1,6 +1,6 @@ use common::executor::Timer; use mm2_core::mm_ctx::MmArc; -use mm2_event_stream::{Broadcaster, Event, EventStreamer, NoDataIn, StreamHandlerInput}; +use mm2_event_stream::{Broadcaster, Event, EventStreamer, NoDataIn, StreamHandlerInput, StreamerId}; use async_trait::async_trait; use futures::channel::oneshot; @@ -38,7 +38,7 @@ impl NetworkEvent { impl EventStreamer for NetworkEvent { type DataInType = NoDataIn; - fn streamer_id(&self) -> String { "NETWORK".to_string() } + fn streamer_id(&self) -> StreamerId { StreamerId::Network } async fn handle( self, diff --git a/mm2src/rpc_task/src/manager.rs b/mm2src/rpc_task/src/manager.rs index b5c43a04b6..232c8efaef 100644 --- a/mm2src/rpc_task/src/manager.rs +++ b/mm2src/rpc_task/src/manager.rs @@ -6,7 +6,7 @@ use common::log::{debug, info, trace, warn}; use futures::channel::oneshot; use futures::future::{select, Either}; use mm2_err_handle::prelude::*; -use mm2_event_stream::{Event, StreamingManager, StreamingManagerError}; +use mm2_event_stream::{Event, StreamerId, StreamingManager, StreamingManagerError}; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::atomic::Ordering; @@ -192,7 +192,7 @@ impl RpcTaskManager { // Note that this should really always be `Some`, since we updated the status *successfully*. if let Some(new_status) = self.task_status(task_id, false) { let event = Event::new( - format!("TASK:{task_id}"), + StreamerId::Task { task_id }, serde_json::to_value(new_status).expect("Serialization shouldn't fail."), ); if let Err(e) = self.streaming_manager.broadcast_to(event, client_id) {