feat(event-streaming): API-driven subscription management#2172
feat(event-streaming): API-driven subscription management#2172
Conversation
it was confusing why a utxo standard coin was being used instead of the coin we are building, this makes a bit more sense
There was no point of the option wrapping. Simplifying stuff.
also does some housekeeping, file splitting and renaming
remove the intermediate bridge task and replace it with `filter_map` call. also properly implement the shutdown handle
also wraps the streamermanager mutable data inside Arc<RwLock< so to be used in concurrent code. RwLock was chosen here since we should be sending streaming data way more than editing the streamers
along with some cleanups and simplifications
Also use the `data_rx` for utxo balance events. A cleanup for the scripthash notification channels/logic will follow
this is very messy though (because of the current state of electurm code) and will be reverted, just recording this in the history for now
…nterface" This reverts commit 8972822.
made more sense to have it inside the manager. also now the manager provides the controller as a parameter in the handle method so different streamers don't have to keep a copy of ctx just to broadcast on the controller.
this is still not hooked up though. no data will be received on this rx channel till we do
`event_stream_manager.send('BALANCE:ZCoin') (or whatever the coin name is)` somewhere.
|
@mariocynicys could you fix conflicts please |
as these won't be addressed in this PR
shamardy
left a comment
There was a problem hiding this comment.
Huge Work! All comments are for discussion and non blockers.
A few items to consider:
- Similar to how we test RPCs we need to add some integration tests for each streamer type and streaming in general, we may need to implement a SSE reader / EventSource for these tests
- Please resolve conflicts
- test_zcoin_tx_streaming is failing in wasm
- Please open docs issue
- Once this PR is merged, we need to check open PRs for streaming coverage like the WalletConnect PR as an example
| // TODO: If the the streamer was already running, it is probably running with different configuration. | ||
| // We might want to inform the client that the configuration they asked for wasn't applied and return | ||
| // the active configuration instead? | ||
| // pub config: Json, |
There was a problem hiding this comment.
It would still be strange for the first client to subscribe to set the config. I see that you postponed this as multi client support completion is not a high priority currently, we can think about better solutions for this in the future.
| MmCoinEnum::EthCoin(_) => (), | ||
| MmCoinEnum::ZCoin(_) | ||
| | MmCoinEnum::UtxoCoin(_) | ||
| | MmCoinEnum::Bch(_) | ||
| | MmCoinEnum::QtumCoin(_) | ||
| | MmCoinEnum::Tendermint(_) => { | ||
| if req.config.is_some() { | ||
| Err(BalanceStreamingRequestError::EnableError( | ||
| "Invalid config provided. No config needed".to_string(), | ||
| ))? | ||
| } | ||
| }, | ||
| _ => Err(BalanceStreamingRequestError::CoinNotSupported)?, |
There was a problem hiding this comment.
I guess we will need this and other streamers for SIA in the future for GUIs to fully support balance streaming c.c. @Alrighttt
There was a problem hiding this comment.
added to the streaming tracking list
| let streamer = ZCoinTxHistoryEventStreamer::new(coin.clone()); | ||
| ctx.event_stream_manager.add(client_id, streamer, coin.spawner()).await | ||
| }, | ||
| _ => Err(TxHistoryStreamingRequestError::CoinNotSupported)?, |
There was a problem hiding this comment.
SIA will need to be added here as well c.c. @Alrighttt
There was a problem hiding this comment.
added to the streaming tracking list
* dev: feat(event-streaming): API-driven subscription management (#2172)
Refactor event streaming to support dynamic client subscriptions over RPC using a unified StreamingManager. The StreamingManager now orchestrates background streamers by initializing a streamer when a client activates it via the RPC API and shutting it down when no longer needed. Legacy fee estimator endpoints have been replaced with streaming RPC methods, and new task manager RPCs for BCH and Tendermint have been added. Additionally, event stream configuration has been migrated from static JSON settings to runtime API initialization for improved flexibility.
* dev: fix(derive_key_from_path): check length of current_key_material (#2356) chore(release): bump mm2 version to 2.4.0-beta (#2346) fix(tests): add additional testnet sepolia nodes to test code (#2358) fix(swaps): maintain legacy compatibility for negotiation messages (#2353) refactor(SwapOps): impl defaults for protocol specific swapops fns (#2354) feat(tpu-v2): provide swap protocol versioning (#2324) feat(wallet): add change mnemonic password rpc (#2317) fix(tpu-v2): fix tpu-v2 wait for payment spend and extract secret (#2261) feat(tendermint): unstaking/undelegation (#2330) fix(utxo-withdraw): get hw ctx only when `PrivKeyPolicy` is trezor (#2333) feat(event-streaming): API-driven subscription management (#2172) fix(hash-types): remove panic, enforce fixed-size arrays (#2279) fix(ARRR): store unconfirmed change output (#2276) feat(tendermint): staking/delegation (#2322) chore(deps): `timed-map` migration (#2247) fix(mem-leak): `running_swap` never shrinks (#2301) chore(dep-bump): libp2p (#2326) refactor(build script): rewrite the main build script (#2319)
* dev: fix(derive_key_from_path): check length of current_key_material (#2356) chore(release): bump mm2 version to 2.4.0-beta (#2346) fix(tests): add additional testnet sepolia nodes to test code (#2358) fix(swaps): maintain legacy compatibility for negotiation messages (#2353) refactor(SwapOps): impl defaults for protocol specific swapops fns (#2354) feat(tpu-v2): provide swap protocol versioning (#2324) feat(wallet): add change mnemonic password rpc (#2317) fix(tpu-v2): fix tpu-v2 wait for payment spend and extract secret (#2261) feat(tendermint): unstaking/undelegation (#2330) fix(utxo-withdraw): get hw ctx only when `PrivKeyPolicy` is trezor (#2333) feat(event-streaming): API-driven subscription management (#2172) fix(hash-types): remove panic, enforce fixed-size arrays (#2279) fix(ARRR): store unconfirmed change output (#2276) feat(tendermint): staking/delegation (#2322) chore(deps): `timed-map` migration (#2247) fix(mem-leak): `running_swap` never shrinks (#2301) chore(dep-bump): libp2p (#2326) refactor(build script): rewrite the main build script (#2319)
* dev: (24 commits) fix(eth-tpu): remove state from funding validation (GLEECBTC#2334) improvement(rpc-server): rpc server dynamic port allocation (GLEECBTC#2342) fix(tests): fix or ignore unstable tests (GLEECBTC#2365) fix(fs): make `filter_files_by_extension` return only files (GLEECBTC#2364) fix(derive_key_from_path): check length of current_key_material (GLEECBTC#2356) chore(release): bump mm2 version to 2.4.0-beta (GLEECBTC#2346) fix(tests): add additional testnet sepolia nodes to test code (GLEECBTC#2358) fix(swaps): maintain legacy compatibility for negotiation messages (GLEECBTC#2353) refactor(SwapOps): impl defaults for protocol specific swapops fns (GLEECBTC#2354) feat(tpu-v2): provide swap protocol versioning (GLEECBTC#2324) feat(wallet): add change mnemonic password rpc (GLEECBTC#2317) fix(tpu-v2): fix tpu-v2 wait for payment spend and extract secret (GLEECBTC#2261) feat(tendermint): unstaking/undelegation (GLEECBTC#2330) fix(utxo-withdraw): get hw ctx only when `PrivKeyPolicy` is trezor (GLEECBTC#2333) feat(event-streaming): API-driven subscription management (GLEECBTC#2172) fix(hash-types): remove panic, enforce fixed-size arrays (GLEECBTC#2279) fix(ARRR): store unconfirmed change output (GLEECBTC#2276) feat(tendermint): staking/delegation (GLEECBTC#2322) chore(deps): `timed-map` migration (GLEECBTC#2247) fix(mem-leak): `running_swap` never shrinks (GLEECBTC#2301) ...
This PR aims to support event streaming in some of MM2's API endpoints.
The work in this PR till now is only a refactor of the event streaming code after multiple iterations of discussing how this should end up looking like.
The flow goes as follows:
The role of the
StreamingManager:cidwhen it first connects through SSE).cid,sid) pair (translates to: client with idcidwants to listen to the streamer with idsid). If such a streamer withsidis already running, it is just instructed that a new client is gonna be listening along, otherwise, it will be spawned in the background.cid,sid) pair (= client withcidno longer wants to receive events fromstreamerwith idsid, if no more clients are listening, the streamer will die).StreamingManager::send(streamer_id, arbitrary_data)is used.Breaking Changes:
event_stream_configurationin MM2 json config. Requested streams are initialized and configured dynamically through the API.event_stream_configuration.access_control_allow_originconfig was moved toaccess_control_allow_origin(one scope out) b9d1218.event_stream_configuration.worker_pathwas changed toevent_stream_worker_path(one scope out) ee32fd1.mm2.com/event-stream?filter=NETWORK,BALANCE,DATA_NEEDED:datatype->mm2.com/event-stream?id=1), but a client id is now required (a random u64 identifier for [& generated by] the client) (e.g.mm2.com/event-stream?id=799384531)stream::enable::balance).broadcast_all). example of these events areDATA_NEEDED:datatype, andHEARTBEATshould be similar later as well.(so for keplr integration, only
?id=<RANDOM_U64>must be added to the SSE init request and everything should work.?filter=shall be removed)(the
<RANDOM_U64>should be stored by the client since it's required in any event streamer activation/deactivation. If for keplr only though, this isn't necessary for now sinceDATA_NEEDED:datatypeevent streamer is a special event that's broadcasted to all clients and doesn't require any sort of activation/subscription before hand)