improvement(event-streaming): strong type streamer IDs#2441
improvement(event-streaming): strong type streamer IDs#2441onur-ozkan merged 10 commits intoGLEECBTC:devfrom
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR refactors the event streaming system to use a strongly typed StreamerId enum rather than plain strings for streamer identifiers. Key changes include updating various event streamer implementations, responses, and manager functions across the codebase to adopt the new StreamerId type, and adding a new StreamerId enum with dedicated variants.
Reviewed Changes
Copilot reviewed 21 out of 21 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| mm2src/mm2_p2p/src/application/network_event.rs | Changed streamer_id return type to StreamerId for Network events. |
| mm2src/mm2_main/src/rpc/streaming_activations/mod.rs/disable.rs | Updated request/response types from String to StreamerId. |
| mm2src/mm2_main/src/* (lp_swap, orderbook_events, etc.) | Replaced String-based streamer ids with strongly typed StreamerId. |
| mm2src/mm2_event_stream/src/streamer.rs | Introduced the new StreamerId enum and updated Display/Debug impls. |
| mm2src/mm2_event_stream/src/event.rs | Changed event handling to use StreamerId instead of raw strings. |
| mm2src/coins/* | Updated coin-related event streamers to use StreamerId for balance, fee, tx history, etc. |
…BTC#2207 used enumeration to represent the IDs of the different types of streamers that are currently supported.
52a34a5 to
80b19c2
Compare
|
|
||
| fn streamer_id(&self) -> String { Self::derive_streamer_id().to_string() } | ||
| fn streamer_id(&self) -> StreamerId { Self::derive_streamer_id().clone() } | ||
|
|
There was a problem hiding this comment.
I can tell this that this method will be called frequently. Can you refactor StreamerId enum to something like this
#[derive(Debug)]
pub enum StreamerIdInner {
Network,
Heartbeat,
SwapStatus,
OrderStatus,
Task(u64),
Balance(String),
DataNeeded(String),
TxHistory(String),
FeeEstimation(String),
OrderbookUpdate(String),
#[cfg(test)]
ForTesting(String),
}
pub type StreamerId = Arc<StreamerIdInner>;There was a problem hiding this comment.
seems good, I have tried this idea and pushed the changes.
|
|
||
| #[inline(always)] | ||
| pub const fn derive_streamer_id() -> &'static str { "ORDER_STATUS" } | ||
| pub const fn derive_streamer_id() -> &'static StreamerId { &StreamerId::OrderStatus } |
There was a problem hiding this comment.
derive_streamer_id returns Owned value in some places while static value in another place. Let's stick to one
There was a problem hiding this comment.
I changed it to return StreamerId in all places.
|
Please add comments to related docs PR GLEECBTC/komodo-docs-mdx#457 if this includes any changes to streamer_id format etc. so I can update examples. |
e3c3f64 to
702befe
Compare
mariocynicys
left a comment
There was a problem hiding this comment.
Thanks
done a quick review over the last commit. will review thoroughly next week.
`derive_streamer_id` was returning `StreamerId` in some places and `&'static StreamerId` in others, so it's now fixed to return `StreamerId` in every place.
9cb39a2 to
fd72859
Compare
…ers GLEECBTC#2441 - Added DeriveStreamerId trait with InitParam for new and DeriveParam for derive_streamer_id, including lifetime 'a for flexible references. - Refactored streamer structs to use &str for DeriveParam where applicable.
mariocynicys
left a comment
There was a problem hiding this comment.
Thanks! ❤️
Done reviewing everything. Will just follow up with my comments and approve when they are done.
| #[derive(Clone, Deserialize, Eq, Hash, PartialEq, Serialize)] | ||
| pub enum StreamerId { | ||
| Network, | ||
| Heartbeat, | ||
| SwapStatus, | ||
| OrderStatus, | ||
| Task(u64), // XXX: should be TaskId (from rpc_task) | ||
| Balance(String), | ||
| DataNeeded(String), | ||
| TxHistory(String), | ||
| FeeEstimation(String), | ||
| OrderbookUpdate(String), | ||
| #[cfg(test)] | ||
| ForTesting(String), | ||
| } | ||
|
|
||
| impl fmt::Display for StreamerId { | ||
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
| match self { | ||
| StreamerId::Network => write!(f, "NETWORK"), | ||
| StreamerId::Heartbeat => write!(f, "HEARTBEAT"), | ||
| StreamerId::SwapStatus => write!(f, "SWAP_STATUS"), | ||
| StreamerId::OrderStatus => write!(f, "ORDER_STATUS"), | ||
| StreamerId::Task(task_id) => write!(f, "TASK:{task_id}"), | ||
| StreamerId::Balance(coin) => write!(f, "BALANCE:{coin}"), |
There was a problem hiding this comment.
so the main reason i didn't wanna do this in the original PR was that this introduces mm2 specific lingo/logic in this crate (mm2_event_stream) which is trying to be abstract.
there looks to be no way around it though - if we wanna strong type streamer ids and avoid possible dev errors.
could you please move this logic to a different isolated/new file. as it doens't really fit in streamer.rs. maybe call it streamer_ids.rs.
There was a problem hiding this comment.
I moved it to its own file, streamer_ids.rs as suggested in 044d932. I have thought about the how to make the Event more generic like this:
pub struct Event<'m, 's, M, S>
where
M: Clone + Debug + Deserialize<'m> + Serialize,
S: Clone + Debug + Deserialize<'s> + Eq + Hash + PartialEq + Serialize,
{
pub streamer_id: S,
pub message: M,
pub error: bool,
_phantom_streamer_id: PhantomData<&'s ()>,
_phantom_message: PhantomData<&'m ()>,
}
pub type Event01<'m, 's> = Event<'m, 's, StreamerId, serde_json::Value>;But that would require us using the lifetimes all over the code and I thought it would be better to discuss it more before implementing it (in another PR maybe.)
There was a problem hiding this comment.
Q about this suggestion, using S and M as generics in the event type essentially creates multiple event types for each S & M pair.
we could ofc then dyn things up and box them to be able to work with them from the streaming manager side. but just saying.
…er_ids.rs
- move StreamerId from streamer.rs to streamer_ids.rs
- use the default Debug implementation for StreamerId
- use custom serialization and deserialization for StreamerId
e.g. StreamerId::Balance(String::from("ETH")) will look like BALANCE:ETH instead of {"Balance":"ETH"}
| use std::fmt; | ||
|
|
||
| #[derive(Clone, Debug, Eq, Hash, PartialEq)] | ||
| pub enum StreamerId { |
There was a problem hiding this comment.
Why not make this streamer list dynamic and agnostic about existing event sources?
I think, modules, which want to send events, may register themselves as event sources in the streamer. In register call they provide event prefixes. In response the streamer would return generated streamer_id which is used as a handle to streamer functions.
With that, we may add new modules as event sources, without touching the streamer code.
@mariocynicys
There was a problem hiding this comment.
so let's say we enable the orderstatus streamer. it registers itself in such list and gets an ID back. 1- what's the type of this ID? 2- how other parts of the code would know this ID so to send data to the streamer (e.g.)
… fix OrderbookUpdate format - Moved all StreamerId string constants to module-level scope for reuse across Display and Deserialize implementations. - Fixed inconsistency in OrderbookUpdate variant string format (was using "/" instead of ":"). - Improved test variant handling using FOR_TESTING_PREFIX under #[cfg(test)].
…nts for better clarity
Replaces `#[cfg(test)]` with `#[cfg(any(test, target_arch = "wasm32"))]` so that the ForTesting variant is compiled when targeting WebAssembly.
There was a problem hiding this comment.
LGTM!
@BigFish2086 @mariocynicys please leave a todo in the issue #2207 or create a new one about optimisations were discussed in the thread #2441 (comment)
@BigFish2086 good start!
…ers GLEECBTC#2441 - Added DeriveStreamerId trait with InitParam for new and DeriveParam for derive_streamer_id, including lifetime 'a for flexible references. - Refactored streamer structs to use &str for DeriveParam where applicable.
…ers GLEECBTC#2441 - Added DeriveStreamerId trait with InitParam for new and DeriveParam for derive_streamer_id, including lifetime 'a for flexible references. - Refactored streamer structs to use &str for DeriveParam where applicable.
* dev: (30 commits) chore(core): replace hash_raw_entry with stable entry() API (GLEECBTC#2473) chore(core): adapt `MmError` and usages for compatibility with new rustc versions (GLEECBTC#2443) feat(wallet): add `delete_wallet` RPC (GLEECBTC#2497) chore(release): add changelog entries for v2.5.0-beta (GLEECBTC#2494) chore(release): bump kdf version to 2.5.0-beta (GLEECBTC#2492) feat(tests): zcoin unit test to validate dex fee (GLEECBTC#2460) fix(zcoin): correctly track unconfirmed z-coin notes (GLEECBTC#2331) improvement(orders): remove BTC specific volume from min_trading_vol logic (GLEECBTC#2483) feat(ibc-routing-part-2): supporting entire Cosmos network for swaps (GLEECBTC#2476) fix(startup): don't initialize WalletConnect during startup (GLEECBTC#2485) fix(dns): better ip resolution (GLEECBTC#2487) improvement(event-streaming): strong type streamer IDs (GLEECBTC#2441) bump timed-map to `1.4.1` (GLEECBTC#2481) improvement(RPC): unified interface for legacy and current RPC interfaces (GLEECBTC#2450) improvement(tendermint): `tendermint_tx_internal_id` helper (GLEECBTC#2438) feat(walletconnect): add WalletConnect v2 support for EVM and Cosmos (GLEECBTC#2223) feat(ibc-routing-part-1): supporting entire Cosmos network for swaps (GLEECBTC#2459) fix(test): fix HD Wallet message signing tests (GLEECBTC#2474) improvement(builds): enable static CRT linking for MSVC builds (GLEECBTC#2464) feat(wallet): implement HD multi-address support for message signing (GLEECBTC#2432) ... # Conflicts: # mm2src/coins/qrc20.rs # mm2src/mm2_main/src/lp_swap/maker_swap.rs
…eParam GLEECBTC#2441 Replaces the raw tuple (&str, &str) with a named type alias for clarity
…nd its associated types GLEECBTC#2441
…eamers (#2489) * refactor(event-streaming): impl DeriveStreamerId trait for all streamers #2441 - Added DeriveStreamerId trait with InitParam for new and DeriveParam for derive_streamer_id, including lifetime 'a for flexible references. - Refactored streamer structs to use &str for DeriveParam where applicable. * refactor(orderbook_events): introduce BaseAndRel type alias for DeriveParam #2441 Replaces the raw tuple (&str, &str) with a named type alias for clarity * docs(event-streaming): add documentation for DeriveStreamerId trait and its associated types #2441 --------- Co-authored-by: BigFish2086 <a8686.ibrahim@gmail.com>
…eamers (#2489) * refactor(event-streaming): impl DeriveStreamerId trait for all streamers #2441 - Added DeriveStreamerId trait with InitParam for new and DeriveParam for derive_streamer_id, including lifetime 'a for flexible references. - Refactored streamer structs to use &str for DeriveParam where applicable. * refactor(orderbook_events): introduce BaseAndRel type alias for DeriveParam #2441 Replaces the raw tuple (&str, &str) with a named type alias for clarity * docs(event-streaming): add documentation for DeriveStreamerId trait and its associated types #2441 --------- Co-authored-by: BigFish2086 <a8686.ibrahim@gmail.com>
(#2207): Refactored the Streamer Id to its own enumeration type.