Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use futures01::Future;
use itertools::Itertools;
use mm2_event_stream::StreamingManager;
use mm2_event_stream::{StreamingManager, StreamingManagerError};
use serde_json::{self as json, Value as Json};

type ElectrumTxHistory = Vec<ElectrumTxHistoryItem>;
Expand Down Expand Up @@ -179,26 +179,28 @@ impl ElectrumClientImpl {

/// Sends a list of addresses through the scripthash notification sender to subscribe to their scripthash notifications.
pub fn subscribe_addresses(&self, addresses: HashSet<Address>) -> Result<(), String> {
self.streaming_manager
.send(
&UtxoBalanceEventStreamer::derive_streamer_id(&self.coin_ticker),
ScripthashNotification::SubscribeToAddresses(addresses),
)
.map_err(|e| ERRL!("Failed sending scripthash message. {:?}", e))?;
Ok(())
match self.streaming_manager.send(
&UtxoBalanceEventStreamer::derive_streamer_id(&self.coin_ticker),
ScripthashNotification::SubscribeToAddresses(addresses),
) {
// Don't error if the streamer isn't found/enabled.
Err(StreamingManagerError::StreamerNotFound) | Ok(()) => Ok(()),
Err(e) => Err(format!("Failed sending scripthash message. {:?}", e)),
}
}

/// Notifies the Utxo balance streamer of a new script hash balance change.
///
/// The streamer will figure out which address this scripthash belongs to and will broadcast an notification to clients.
pub fn notify_triggered_hash(&self, script_hash: String) -> Result<(), String> {
self.streaming_manager
.send(
&UtxoBalanceEventStreamer::derive_streamer_id(&self.coin_ticker),
ScripthashNotification::Triggered(script_hash),
)
.map_err(|e| ERRL!("Failed sending scripthash message. {:?}", e))?;
Ok(())
match self.streaming_manager.send(
&UtxoBalanceEventStreamer::derive_streamer_id(&self.coin_ticker),
ScripthashNotification::Triggered(script_hash),
) {
// Don't error if the streamer isn't found/enabled.
Err(StreamingManagerError::StreamerNotFound) | Ok(()) => Ok(()),
Err(e) => Err(format!("Failed sending scripthash message. {:?}", e)),
}
}

/// Get block headers storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::connection_context::ConnectionContext;
use crate::utxo::rpc_clients::UtxoRpcClientOps;
use common::executor::abortable_queue::AbortableQueue;
use common::executor::{AbortableSystem, SpawnFuture, Timer};
use common::log::{debug, error};
use common::log::{debug, error, LogOnError};
use common::notifier::{Notifiee, Notifier};
use common::now_ms;
use keys::Address;
Expand Down Expand Up @@ -277,7 +277,7 @@ impl ConnectionManager {
let abandoned_subs = connection_ctx.disconnected();
// Re-subscribe the abandoned addresses using the client.
let client = unwrap_or_return!(self.get_client());
client.subscribe_addresses(abandoned_subs).ok();
client.subscribe_addresses(abandoned_subs).error_log();
}

/// A method that should be called after using a specific server for some request.
Expand Down
91 changes: 48 additions & 43 deletions mm2src/coins/utxo/utxo_balance_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use futures::channel::oneshot;
use futures::StreamExt;
use keys::Address;
use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{HashMap, HashSet};

macro_rules! try_or_continue {
($exp:expr) => {
Expand Down Expand Up @@ -58,58 +58,35 @@ impl EventStreamer for UtxoBalanceEventStreamer {
const RECEIVER_DROPPED_MSG: &str = "Receiver is dropped, which should never happen.";
let streamer_id = self.streamer_id();
let coin = self.coin;
let mut scripthash_to_address_map = HashMap::new();

async fn subscribe_to_addresses(
utxo: &UtxoCoinFields,
addresses: HashSet<Address>,
) -> Result<BTreeMap<String, Address>, String> {
match utxo.rpc_client.clone() {
UtxoRpcClientEnum::Electrum(client) => {
// Collect the scrpithash for every address into a map.
let scripthash_to_address_map = addresses
.into_iter()
.map(|address| {
let scripthash = address_to_scripthash(&address).map_err(|e| e.to_string())?;
Ok((scripthash, address))
})
.collect::<Result<HashMap<String, Address>, String>>()?;
// Add these subscriptions to the connection manager. It will choose whatever connections
// it sees fit to subscribe each of these addresses to.
client
.connection_manager
.add_subscriptions(&scripthash_to_address_map)
.await;
// Convert the hashmap back to btreemap.
Ok(scripthash_to_address_map.into_iter().map(|(k, v)| (k, v)).collect())
},
UtxoRpcClientEnum::Native(_) => {
Err("Balance streaming is currently not supported for native client.".to_owned())
},
}
}

// Make sure the RPC client is not native. That doesn't support balance streaming.
if coin.as_ref().rpc_client.is_native() {
let msg = "Native RPC client is not supported for UtxoBalanceEventStreamer.";
let msg = "Balance streaming is not supported for native RPC client.";
ready_tx.send(Err(msg.to_string())).expect(RECEIVER_DROPPED_MSG);
panic!("{}", msg);
}

};
// Get all the addresses to subscribe to their balance updates.
let all_addresses = match coin.all_addresses().await {
Comment thread
mariocynicys marked this conversation as resolved.
Ok(addresses) => addresses,
Err(e) => {
let msg = format!("Failed to get all addresses: {e}");
ready_tx.send(Err(msg.clone())).expect(RECEIVER_DROPPED_MSG);
panic!("{}", msg);
Comment thread
shamardy marked this conversation as resolved.
},
};
ready_tx.send(Ok(())).expect(RECEIVER_DROPPED_MSG);

let mut scripthash_to_address_map = BTreeMap::default();
// Initially, subscribe to all the addresses we currently have.
let tracking_list = subscribe_to_addresses(coin.as_ref(), all_addresses).await;
scripthash_to_address_map.extend(tracking_list);

while let Some(message) = data_rx.next().await {
let notified_scripthash = match message {
ScripthashNotification::Triggered(t) => t,
ScripthashNotification::SubscribeToAddresses(addresses) => {
match subscribe_to_addresses(coin.as_ref(), addresses).await {
Ok(map) => scripthash_to_address_map.extend(map),
Err(e) => {
log::error!("{e}");

broadcaster.broadcast(Event::err(streamer_id.clone(), json!({ "error": e })));
},
};

let tracking_list = subscribe_to_addresses(coin.as_ref(), addresses).await;
scripthash_to_address_map.extend(tracking_list);
continue;
},
};
Expand Down Expand Up @@ -172,3 +149,31 @@ impl EventStreamer for UtxoBalanceEventStreamer {
}
}
}

async fn subscribe_to_addresses(utxo: &UtxoCoinFields, addresses: HashSet<Address>) -> HashMap<String, Address> {
match utxo.rpc_client.clone() {
UtxoRpcClientEnum::Electrum(client) => {
// Collect the scripthash for every address into a map.
let scripthash_to_address_map = addresses
.into_iter()
.filter_map(|address| {
let scripthash = address_to_scripthash(&address)
.map_err(|e| log::error!("Failed to get scripthash for address {address}: {e}"))
.ok()?;
Some((scripthash, address))
})
.collect();
// Add these subscriptions to the connection manager. It will choose whatever connections
// it sees fit to subscribe each of these addresses to.
client
.connection_manager
.add_subscriptions(&scripthash_to_address_map)
.await;
scripthash_to_address_map
},
UtxoRpcClientEnum::Native(_) => {
// Unreachable: The caller should have checked that the RPC client isn't native.
HashMap::new()
},
}
}
Loading