diff --git a/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs b/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs index 84e39997e4..e7c5311db2 100644 --- a/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs +++ b/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs @@ -51,7 +51,7 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use futures01::Future; use itertools::Itertools; -use mm2_event_stream::StreamingManager; +use mm2_event_stream::{StreamingManager, StreamingManagerError}; use serde_json::{self as json, Value as Json}; type ElectrumTxHistory = Vec; @@ -179,26 +179,28 @@ impl ElectrumClientImpl { /// Sends a list of addresses through the scripthash notification sender to subscribe to their scripthash notifications. pub fn subscribe_addresses(&self, addresses: HashSet
) -> Result<(), String> { - self.streaming_manager - .send( - &UtxoBalanceEventStreamer::derive_streamer_id(&self.coin_ticker), - ScripthashNotification::SubscribeToAddresses(addresses), - ) - .map_err(|e| ERRL!("Failed sending scripthash message. {:?}", e))?; - Ok(()) + match self.streaming_manager.send( + &UtxoBalanceEventStreamer::derive_streamer_id(&self.coin_ticker), + ScripthashNotification::SubscribeToAddresses(addresses), + ) { + // Don't error if the streamer isn't found/enabled. + Err(StreamingManagerError::StreamerNotFound) | Ok(()) => Ok(()), + Err(e) => Err(format!("Failed sending scripthash message. {:?}", e)), + } } /// Notifies the Utxo balance streamer of a new script hash balance change. /// /// The streamer will figure out which address this scripthash belongs to and will broadcast an notification to clients. pub fn notify_triggered_hash(&self, script_hash: String) -> Result<(), String> { - self.streaming_manager - .send( - &UtxoBalanceEventStreamer::derive_streamer_id(&self.coin_ticker), - ScripthashNotification::Triggered(script_hash), - ) - .map_err(|e| ERRL!("Failed sending scripthash message. {:?}", e))?; - Ok(()) + match self.streaming_manager.send( + &UtxoBalanceEventStreamer::derive_streamer_id(&self.coin_ticker), + ScripthashNotification::Triggered(script_hash), + ) { + // Don't error if the streamer isn't found/enabled. + Err(StreamingManagerError::StreamerNotFound) | Ok(()) => Ok(()), + Err(e) => Err(format!("Failed sending scripthash message. {:?}", e)), + } } /// Get block headers storage. diff --git a/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection_manager/manager.rs b/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection_manager/manager.rs index b06628fd60..a3b792e9f6 100644 --- a/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection_manager/manager.rs +++ b/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection_manager/manager.rs @@ -9,7 +9,7 @@ use super::connection_context::ConnectionContext; use crate::utxo::rpc_clients::UtxoRpcClientOps; use common::executor::abortable_queue::AbortableQueue; use common::executor::{AbortableSystem, SpawnFuture, Timer}; -use common::log::{debug, error}; +use common::log::{debug, error, LogOnError}; use common::notifier::{Notifiee, Notifier}; use common::now_ms; use keys::Address; @@ -277,7 +277,7 @@ impl ConnectionManager { let abandoned_subs = connection_ctx.disconnected(); // Re-subscribe the abandoned addresses using the client. let client = unwrap_or_return!(self.get_client()); - client.subscribe_addresses(abandoned_subs).ok(); + client.subscribe_addresses(abandoned_subs).error_log(); } /// A method that should be called after using a specific server for some request. diff --git a/mm2src/coins/utxo/utxo_balance_events.rs b/mm2src/coins/utxo/utxo_balance_events.rs index 5de90555e4..8fdde86ab7 100644 --- a/mm2src/coins/utxo/utxo_balance_events.rs +++ b/mm2src/coins/utxo/utxo_balance_events.rs @@ -13,7 +13,7 @@ use futures::channel::oneshot; use futures::StreamExt; use keys::Address; use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput}; -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{HashMap, HashSet}; macro_rules! try_or_continue { ($exp:expr) => { @@ -58,58 +58,35 @@ impl EventStreamer for UtxoBalanceEventStreamer { const RECEIVER_DROPPED_MSG: &str = "Receiver is dropped, which should never happen."; let streamer_id = self.streamer_id(); let coin = self.coin; + let mut scripthash_to_address_map = HashMap::new(); - async fn subscribe_to_addresses( - utxo: &UtxoCoinFields, - addresses: HashSet
, - ) -> Result, String> { - match utxo.rpc_client.clone() { - UtxoRpcClientEnum::Electrum(client) => { - // Collect the scrpithash for every address into a map. - let scripthash_to_address_map = addresses - .into_iter() - .map(|address| { - let scripthash = address_to_scripthash(&address).map_err(|e| e.to_string())?; - Ok((scripthash, address)) - }) - .collect::, String>>()?; - // Add these subscriptions to the connection manager. It will choose whatever connections - // it sees fit to subscribe each of these addresses to. - client - .connection_manager - .add_subscriptions(&scripthash_to_address_map) - .await; - // Convert the hashmap back to btreemap. - Ok(scripthash_to_address_map.into_iter().map(|(k, v)| (k, v)).collect()) - }, - UtxoRpcClientEnum::Native(_) => { - Err("Balance streaming is currently not supported for native client.".to_owned()) - }, - } - } - + // Make sure the RPC client is not native. That doesn't support balance streaming. if coin.as_ref().rpc_client.is_native() { - let msg = "Native RPC client is not supported for UtxoBalanceEventStreamer."; + let msg = "Balance streaming is not supported for native RPC client."; ready_tx.send(Err(msg.to_string())).expect(RECEIVER_DROPPED_MSG); panic!("{}", msg); - } - + }; + // Get all the addresses to subscribe to their balance updates. + let all_addresses = match coin.all_addresses().await { + Ok(addresses) => addresses, + Err(e) => { + let msg = format!("Failed to get all addresses: {e}"); + ready_tx.send(Err(msg.clone())).expect(RECEIVER_DROPPED_MSG); + panic!("{}", msg); + }, + }; ready_tx.send(Ok(())).expect(RECEIVER_DROPPED_MSG); - let mut scripthash_to_address_map = BTreeMap::default(); + // Initially, subscribe to all the addresses we currently have. + let tracking_list = subscribe_to_addresses(coin.as_ref(), all_addresses).await; + scripthash_to_address_map.extend(tracking_list); + while let Some(message) = data_rx.next().await { let notified_scripthash = match message { ScripthashNotification::Triggered(t) => t, ScripthashNotification::SubscribeToAddresses(addresses) => { - match subscribe_to_addresses(coin.as_ref(), addresses).await { - Ok(map) => scripthash_to_address_map.extend(map), - Err(e) => { - log::error!("{e}"); - - broadcaster.broadcast(Event::err(streamer_id.clone(), json!({ "error": e }))); - }, - }; - + let tracking_list = subscribe_to_addresses(coin.as_ref(), addresses).await; + scripthash_to_address_map.extend(tracking_list); continue; }, }; @@ -172,3 +149,31 @@ impl EventStreamer for UtxoBalanceEventStreamer { } } } + +async fn subscribe_to_addresses(utxo: &UtxoCoinFields, addresses: HashSet
) -> HashMap { + match utxo.rpc_client.clone() { + UtxoRpcClientEnum::Electrum(client) => { + // Collect the scripthash for every address into a map. + let scripthash_to_address_map = addresses + .into_iter() + .filter_map(|address| { + let scripthash = address_to_scripthash(&address) + .map_err(|e| log::error!("Failed to get scripthash for address {address}: {e}")) + .ok()?; + Some((scripthash, address)) + }) + .collect(); + // Add these subscriptions to the connection manager. It will choose whatever connections + // it sees fit to subscribe each of these addresses to. + client + .connection_manager + .add_subscriptions(&scripthash_to_address_map) + .await; + scripthash_to_address_map + }, + UtxoRpcClientEnum::Native(_) => { + // Unreachable: The caller should have checked that the RPC client isn't native. + HashMap::new() + }, + } +}