From c6c66d74ff39ee0a82f2bd92469d31f1f6901335 Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Tue, 29 Apr 2025 16:46:05 +0300 Subject: [PATCH 1/6] do not error in address balance subscription or notification if the streamer isn't enabled check utxo_prepare_addresses_for_balance_stream_if_enabled for context. we should not error when the streamer isn't enabled. we now silently proceed. --- .../utxo/rpc_clients/electrum_rpc/client.rs | 32 ++++++++++--------- .../connection_manager/manager.rs | 4 +-- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs b/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs index 84e39997e4..e7c5311db2 100644 --- a/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs +++ b/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs @@ -51,7 +51,7 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use futures01::Future; use itertools::Itertools; -use mm2_event_stream::StreamingManager; +use mm2_event_stream::{StreamingManager, StreamingManagerError}; use serde_json::{self as json, Value as Json}; type ElectrumTxHistory = Vec; @@ -179,26 +179,28 @@ impl ElectrumClientImpl { /// Sends a list of addresses through the scripthash notification sender to subscribe to their scripthash notifications. pub fn subscribe_addresses(&self, addresses: HashSet
) -> Result<(), String> { - self.streaming_manager - .send( - &UtxoBalanceEventStreamer::derive_streamer_id(&self.coin_ticker), - ScripthashNotification::SubscribeToAddresses(addresses), - ) - .map_err(|e| ERRL!("Failed sending scripthash message. {:?}", e))?; - Ok(()) + match self.streaming_manager.send( + &UtxoBalanceEventStreamer::derive_streamer_id(&self.coin_ticker), + ScripthashNotification::SubscribeToAddresses(addresses), + ) { + // Don't error if the streamer isn't found/enabled. + Err(StreamingManagerError::StreamerNotFound) | Ok(()) => Ok(()), + Err(e) => Err(format!("Failed sending scripthash message. {:?}", e)), + } } /// Notifies the Utxo balance streamer of a new script hash balance change. /// /// The streamer will figure out which address this scripthash belongs to and will broadcast an notification to clients. pub fn notify_triggered_hash(&self, script_hash: String) -> Result<(), String> { - self.streaming_manager - .send( - &UtxoBalanceEventStreamer::derive_streamer_id(&self.coin_ticker), - ScripthashNotification::Triggered(script_hash), - ) - .map_err(|e| ERRL!("Failed sending scripthash message. {:?}", e))?; - Ok(()) + match self.streaming_manager.send( + &UtxoBalanceEventStreamer::derive_streamer_id(&self.coin_ticker), + ScripthashNotification::Triggered(script_hash), + ) { + // Don't error if the streamer isn't found/enabled. + Err(StreamingManagerError::StreamerNotFound) | Ok(()) => Ok(()), + Err(e) => Err(format!("Failed sending scripthash message. {:?}", e)), + } } /// Get block headers storage. diff --git a/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection_manager/manager.rs b/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection_manager/manager.rs index b06628fd60..a3b792e9f6 100644 --- a/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection_manager/manager.rs +++ b/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection_manager/manager.rs @@ -9,7 +9,7 @@ use super::connection_context::ConnectionContext; use crate::utxo::rpc_clients::UtxoRpcClientOps; use common::executor::abortable_queue::AbortableQueue; use common::executor::{AbortableSystem, SpawnFuture, Timer}; -use common::log::{debug, error}; +use common::log::{debug, error, LogOnError}; use common::notifier::{Notifiee, Notifier}; use common::now_ms; use keys::Address; @@ -277,7 +277,7 @@ impl ConnectionManager { let abandoned_subs = connection_ctx.disconnected(); // Re-subscribe the abandoned addresses using the client. let client = unwrap_or_return!(self.get_client()); - client.subscribe_addresses(abandoned_subs).ok(); + client.subscribe_addresses(abandoned_subs).error_log(); } /// A method that should be called after using a specific server for some request. From 8496a22184d6dd2e353332aac6725af09baef10d Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Tue, 29 Apr 2025 19:05:05 +0300 Subject: [PATCH 2/6] register all the wallet addresses initially in utxo balance event streamer --- mm2src/coins/utxo/utxo_balance_events.rs | 83 ++++++++++++++---------- 1 file changed, 48 insertions(+), 35 deletions(-) diff --git a/mm2src/coins/utxo/utxo_balance_events.rs b/mm2src/coins/utxo/utxo_balance_events.rs index 5de90555e4..458277e700 100644 --- a/mm2src/coins/utxo/utxo_balance_events.rs +++ b/mm2src/coins/utxo/utxo_balance_events.rs @@ -58,45 +58,29 @@ impl EventStreamer for UtxoBalanceEventStreamer { const RECEIVER_DROPPED_MSG: &str = "Receiver is dropped, which should never happen."; let streamer_id = self.streamer_id(); let coin = self.coin; + let mut scripthash_to_address_map = BTreeMap::default(); - async fn subscribe_to_addresses( - utxo: &UtxoCoinFields, - addresses: HashSet
, - ) -> Result, String> { - match utxo.rpc_client.clone() { - UtxoRpcClientEnum::Electrum(client) => { - // Collect the scrpithash for every address into a map. - let scripthash_to_address_map = addresses - .into_iter() - .map(|address| { - let scripthash = address_to_scripthash(&address).map_err(|e| e.to_string())?; - Ok((scripthash, address)) - }) - .collect::, String>>()?; - // Add these subscriptions to the connection manager. It will choose whatever connections - // it sees fit to subscribe each of these addresses to. - client - .connection_manager - .add_subscriptions(&scripthash_to_address_map) - .await; - // Convert the hashmap back to btreemap. - Ok(scripthash_to_address_map.into_iter().map(|(k, v)| (k, v)).collect()) - }, - UtxoRpcClientEnum::Native(_) => { - Err("Balance streaming is currently not supported for native client.".to_owned()) - }, - } - } - - if coin.as_ref().rpc_client.is_native() { - let msg = "Native RPC client is not supported for UtxoBalanceEventStreamer."; - ready_tx.send(Err(msg.to_string())).expect(RECEIVER_DROPPED_MSG); - panic!("{}", msg); + // Get all the addresses to subscribe to their balance updates. + let all_addresses = match coin.all_addresses().await { + Ok(addresses) => addresses, + Err(e) => { + let msg = format!("Failed to get all addresses: {e}"); + ready_tx.send(Err(msg.clone())).expect(RECEIVER_DROPPED_MSG); + panic!("{}", msg); + }, + }; + // FIXME: This might take some good LONG time in an HD wallet with many addresses. + // We better optimistically respond to the `ready_tx` to avoid blocking the enabler response to the RPC. + match subscribe_to_addresses(coin.as_ref(), all_addresses).await { + Ok(initial_tracking_list) => scripthash_to_address_map.extend(initial_tracking_list), + Err(e) => { + let msg = format!("Failed to subscribe to balance events: {e}"); + ready_tx.send(Err(msg.clone())).expect(RECEIVER_DROPPED_MSG); + panic!("{}", msg); + }, } - ready_tx.send(Ok(())).expect(RECEIVER_DROPPED_MSG); - let mut scripthash_to_address_map = BTreeMap::default(); while let Some(message) = data_rx.next().await { let notified_scripthash = match message { ScripthashNotification::Triggered(t) => t, @@ -172,3 +156,32 @@ impl EventStreamer for UtxoBalanceEventStreamer { } } } + +async fn subscribe_to_addresses( + utxo: &UtxoCoinFields, + addresses: HashSet
, +) -> Result, String> { + match utxo.rpc_client.clone() { + UtxoRpcClientEnum::Electrum(client) => { + // Collect the scrpithash for every address into a map. + let scripthash_to_address_map = addresses + .into_iter() + .map(|address| { + let scripthash = address_to_scripthash(&address).map_err(|e| e.to_string())?; + Ok((scripthash, address)) + }) + .collect::, String>>()?; + // Add these subscriptions to the connection manager. It will choose whatever connections + // it sees fit to subscribe each of these addresses to. + client + .connection_manager + .add_subscriptions(&scripthash_to_address_map) + .await; + // Convert the hashmap back to btreemap. + Ok(scripthash_to_address_map.into_iter().map(|(k, v)| (k, v)).collect()) + }, + UtxoRpcClientEnum::Native(_) => { + Err("Balance streaming is currently not supported for native client.".to_owned()) + }, + } +} From c3b20474dddb79a5a85f276511ffe40f2b9991c7 Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Wed, 30 Apr 2025 20:25:12 +0300 Subject: [PATCH 3/6] review(sami): ditch the btreemap --- mm2src/coins/utxo/utxo_balance_events.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/mm2src/coins/utxo/utxo_balance_events.rs b/mm2src/coins/utxo/utxo_balance_events.rs index 458277e700..edeab66d90 100644 --- a/mm2src/coins/utxo/utxo_balance_events.rs +++ b/mm2src/coins/utxo/utxo_balance_events.rs @@ -13,7 +13,7 @@ use futures::channel::oneshot; use futures::StreamExt; use keys::Address; use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput}; -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{HashMap, HashSet}; macro_rules! try_or_continue { ($exp:expr) => { @@ -58,7 +58,7 @@ impl EventStreamer for UtxoBalanceEventStreamer { const RECEIVER_DROPPED_MSG: &str = "Receiver is dropped, which should never happen."; let streamer_id = self.streamer_id(); let coin = self.coin; - let mut scripthash_to_address_map = BTreeMap::default(); + let mut scripthash_to_address_map = HashMap::new(); // Get all the addresses to subscribe to their balance updates. let all_addresses = match coin.all_addresses().await { @@ -160,7 +160,7 @@ impl EventStreamer for UtxoBalanceEventStreamer { async fn subscribe_to_addresses( utxo: &UtxoCoinFields, addresses: HashSet
, -) -> Result, String> { +) -> Result, String> { match utxo.rpc_client.clone() { UtxoRpcClientEnum::Electrum(client) => { // Collect the scrpithash for every address into a map. @@ -170,15 +170,14 @@ async fn subscribe_to_addresses( let scripthash = address_to_scripthash(&address).map_err(|e| e.to_string())?; Ok((scripthash, address)) }) - .collect::, String>>()?; + .collect::>()?; // Add these subscriptions to the connection manager. It will choose whatever connections // it sees fit to subscribe each of these addresses to. client .connection_manager .add_subscriptions(&scripthash_to_address_map) .await; - // Convert the hashmap back to btreemap. - Ok(scripthash_to_address_map.into_iter().map(|(k, v)| (k, v)).collect()) + Ok(scripthash_to_address_map) }, UtxoRpcClientEnum::Native(_) => { Err("Balance streaming is currently not supported for native client.".to_owned()) From b68cb029b1ee12fafac8070d299da416ceb3bc47 Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Thu, 1 May 2025 13:45:55 +0300 Subject: [PATCH 4/6] make subscribe_to_addresses infallible --- mm2src/coins/utxo/utxo_balance_events.rs | 50 ++++++++++-------------- 1 file changed, 21 insertions(+), 29 deletions(-) diff --git a/mm2src/coins/utxo/utxo_balance_events.rs b/mm2src/coins/utxo/utxo_balance_events.rs index edeab66d90..11b78b8d7f 100644 --- a/mm2src/coins/utxo/utxo_balance_events.rs +++ b/mm2src/coins/utxo/utxo_balance_events.rs @@ -60,6 +60,12 @@ impl EventStreamer for UtxoBalanceEventStreamer { let coin = self.coin; let mut scripthash_to_address_map = HashMap::new(); + // Make sure the RPC client is not native. That doesn't support balance streaming. + if let UtxoRpcClientEnum::Native(_) = coin.as_ref().rpc_client { + let msg = "Balance streaming is not supported for native RPC client."; + ready_tx.send(Err(msg.to_string())).expect(RECEIVER_DROPPED_MSG); + panic!("{}", msg); + }; // Get all the addresses to subscribe to their balance updates. let all_addresses = match coin.all_addresses().await { Ok(addresses) => addresses, @@ -69,31 +75,18 @@ impl EventStreamer for UtxoBalanceEventStreamer { panic!("{}", msg); }, }; - // FIXME: This might take some good LONG time in an HD wallet with many addresses. - // We better optimistically respond to the `ready_tx` to avoid blocking the enabler response to the RPC. - match subscribe_to_addresses(coin.as_ref(), all_addresses).await { - Ok(initial_tracking_list) => scripthash_to_address_map.extend(initial_tracking_list), - Err(e) => { - let msg = format!("Failed to subscribe to balance events: {e}"); - ready_tx.send(Err(msg.clone())).expect(RECEIVER_DROPPED_MSG); - panic!("{}", msg); - }, - } ready_tx.send(Ok(())).expect(RECEIVER_DROPPED_MSG); + // Initially, subscribe to all the addresses we currently have. + let tracking_list = subscribe_to_addresses(coin.as_ref(), all_addresses).await; + scripthash_to_address_map.extend(tracking_list); + while let Some(message) = data_rx.next().await { let notified_scripthash = match message { ScripthashNotification::Triggered(t) => t, ScripthashNotification::SubscribeToAddresses(addresses) => { - match subscribe_to_addresses(coin.as_ref(), addresses).await { - Ok(map) => scripthash_to_address_map.extend(map), - Err(e) => { - log::error!("{e}"); - - broadcaster.broadcast(Event::err(streamer_id.clone(), json!({ "error": e }))); - }, - }; - + let tracking_list = subscribe_to_addresses(coin.as_ref(), addresses).await; + scripthash_to_address_map.extend(tracking_list); continue; }, }; @@ -157,30 +150,29 @@ impl EventStreamer for UtxoBalanceEventStreamer { } } -async fn subscribe_to_addresses( - utxo: &UtxoCoinFields, - addresses: HashSet
, -) -> Result, String> { +async fn subscribe_to_addresses(utxo: &UtxoCoinFields, addresses: HashSet
) -> HashMap { match utxo.rpc_client.clone() { UtxoRpcClientEnum::Electrum(client) => { // Collect the scrpithash for every address into a map. let scripthash_to_address_map = addresses .into_iter() - .map(|address| { - let scripthash = address_to_scripthash(&address).map_err(|e| e.to_string())?; - Ok((scripthash, address)) + .filter_map(|address| { + let scripthash = address_to_scripthash(&address) + .map_err(|e| log::error!("Failed to get scripthash for address {address}: {e}")) + .ok()?; + Some((scripthash, address)) }) - .collect::>()?; + .collect(); // Add these subscriptions to the connection manager. It will choose whatever connections // it sees fit to subscribe each of these addresses to. client .connection_manager .add_subscriptions(&scripthash_to_address_map) .await; - Ok(scripthash_to_address_map) + scripthash_to_address_map }, UtxoRpcClientEnum::Native(_) => { - Err("Balance streaming is currently not supported for native client.".to_owned()) + unreachable!("The caller of this func checked that the RPC client is electrum. Native client isn't supported for balance streaming.") }, } } From bb2b8465bf4c0f166d2309cfbd082bc0b95f8b38 Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Thu, 1 May 2025 14:38:19 +0300 Subject: [PATCH 5/6] review(shamardy): use `is_native()` utility function --- mm2src/coins/utxo/utxo_balance_events.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mm2src/coins/utxo/utxo_balance_events.rs b/mm2src/coins/utxo/utxo_balance_events.rs index 11b78b8d7f..d61fe4a1a9 100644 --- a/mm2src/coins/utxo/utxo_balance_events.rs +++ b/mm2src/coins/utxo/utxo_balance_events.rs @@ -61,7 +61,7 @@ impl EventStreamer for UtxoBalanceEventStreamer { let mut scripthash_to_address_map = HashMap::new(); // Make sure the RPC client is not native. That doesn't support balance streaming. - if let UtxoRpcClientEnum::Native(_) = coin.as_ref().rpc_client { + if coin.as_ref().rpc_client.is_native() { let msg = "Balance streaming is not supported for native RPC client."; ready_tx.send(Err(msg.to_string())).expect(RECEIVER_DROPPED_MSG); panic!("{}", msg); From 7ff638b8059b3ac7042d38578826914740407b84 Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Thu, 1 May 2025 14:51:24 +0300 Subject: [PATCH 6/6] review(shamardy): avoid panicking and just return an empty map --- mm2src/coins/utxo/utxo_balance_events.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mm2src/coins/utxo/utxo_balance_events.rs b/mm2src/coins/utxo/utxo_balance_events.rs index d61fe4a1a9..8fdde86ab7 100644 --- a/mm2src/coins/utxo/utxo_balance_events.rs +++ b/mm2src/coins/utxo/utxo_balance_events.rs @@ -153,7 +153,7 @@ impl EventStreamer for UtxoBalanceEventStreamer { async fn subscribe_to_addresses(utxo: &UtxoCoinFields, addresses: HashSet
) -> HashMap { match utxo.rpc_client.clone() { UtxoRpcClientEnum::Electrum(client) => { - // Collect the scrpithash for every address into a map. + // Collect the scripthash for every address into a map. let scripthash_to_address_map = addresses .into_iter() .filter_map(|address| { @@ -172,7 +172,8 @@ async fn subscribe_to_addresses(utxo: &UtxoCoinFields, addresses: HashSet { - unreachable!("The caller of this func checked that the RPC client is electrum. Native client isn't supported for balance streaming.") + // Unreachable: The caller should have checked that the RPC client isn't native. + HashMap::new() }, } }