diff --git a/mm2src/coins/coin_balance.rs b/mm2src/coins/coin_balance.rs index 8dca163419..b5f4a511e1 100644 --- a/mm2src/coins/coin_balance.rs +++ b/mm2src/coins/coin_balance.rs @@ -10,7 +10,7 @@ use futures::compat::Future01CompatExt; use mm2_err_handle::prelude::*; use mm2_number::BigDecimal; #[cfg(test)] use mocktopus::macros::*; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::ops::Range; use std::{fmt, iter}; @@ -321,6 +321,12 @@ pub trait HDWalletBalanceOps: HDWalletCoinOps { let balance = self.known_address_balance(address).await?; Ok(AddressBalanceStatus::Used(balance)) } + + /// Prepares addresses for real time balance streaming if coin balance event is enabled. + async fn prepare_addresses_for_balance_stream_if_enabled( + &self, + addresses: HashSet, + ) -> MmResult<(), String>; } #[async_trait] diff --git a/mm2src/coins/rpc_command/get_new_address.rs b/mm2src/coins/rpc_command/get_new_address.rs index 54263226e6..5293d8ff69 100644 --- a/mm2src/coins/rpc_command/get_new_address.rs +++ b/mm2src/coins/rpc_command/get_new_address.rs @@ -54,6 +54,8 @@ pub enum GetNewAddressRpcError { RpcInvalidResponse(String), #[display(fmt = "HD wallet storage error: {_0}")] WalletStorageError(String), + #[display(fmt = "Failed scripthash subscription. Error: {_0}")] + FailedScripthashSubscription(String), #[from_trait(WithTimeout::timeout)] #[display(fmt = "RPC timed out {_0:?}")] Timeout(Duration), @@ -183,6 +185,7 @@ impl HttpStatusCode for GetNewAddressRpcError { GetNewAddressRpcError::Transport(_) | GetNewAddressRpcError::RpcInvalidResponse(_) | GetNewAddressRpcError::WalletStorageError(_) + | GetNewAddressRpcError::FailedScripthashSubscription(_) | GetNewAddressRpcError::HwError(_) | GetNewAddressRpcError::Internal(_) => StatusCode::INTERNAL_SERVER_ERROR, GetNewAddressRpcError::Timeout(_) => StatusCode::REQUEST_TIMEOUT, @@ -380,8 +383,10 @@ pub(crate) mod common_impl { use super::*; use crate::coin_balance::{HDAddressBalanceScanner, HDWalletBalanceOps}; use crate::hd_wallet::{HDAccountOps, HDWalletCoinOps, HDWalletOps}; + use crate::utxo::UtxoCommonOps; use crate::{CoinWithDerivationMethod, HDAddress}; use crypto::RpcDerivationPath; + use std::collections::HashSet; use std::fmt; use std::ops::DerefMut; @@ -435,9 +440,12 @@ pub(crate) mod common_impl { ) -> MmResult where ConfirmAddress: HDConfirmAddress, - Coin: - HDWalletBalanceOps + CoinWithDerivationMethod::HDWallet> + Send + Sync, - ::Address: fmt::Display, + Coin: UtxoCommonOps + + HDWalletBalanceOps + + CoinWithDerivationMethod::HDWallet> + + Send + + Sync, + ::Address: fmt::Display + Into + std::hash::Hash + std::cmp::Eq, { let hd_wallet = coin.derivation_method().hd_wallet_or_err()?; @@ -462,9 +470,16 @@ pub(crate) mod common_impl { .await?; let balance = coin.known_address_balance(&address).await?; + + let address_as_string = address.to_string(); + + coin.prepare_addresses_for_balance_stream_if_enabled(HashSet::from([address])) + .await + .map_err(|e| GetNewAddressRpcError::FailedScripthashSubscription(e.to_string()))?; + Ok(GetNewAddressResponse { new_address: HDAddressBalance { - address: address.to_string(), + address: address_as_string, derivation_path: RpcDerivationPath(derivation_path), chain, balance, diff --git a/mm2src/coins/rpc_command/hd_account_balance_rpc_error.rs b/mm2src/coins/rpc_command/hd_account_balance_rpc_error.rs index e929692285..dfb369f90e 100644 --- a/mm2src/coins/rpc_command/hd_account_balance_rpc_error.rs +++ b/mm2src/coins/rpc_command/hd_account_balance_rpc_error.rs @@ -26,6 +26,8 @@ pub enum HDAccountBalanceRpcError { WalletStorageError(String), #[display(fmt = "Electrum/Native RPC invalid response: {}", _0)] RpcInvalidResponse(String), + #[display(fmt = "Failed scripthash subscription. Error: {_0}")] + FailedScripthashSubscription(String), #[display(fmt = "Transport: {}", _0)] Transport(String), #[display(fmt = "Internal: {}", _0)] @@ -44,6 +46,7 @@ impl HttpStatusCode for HDAccountBalanceRpcError { HDAccountBalanceRpcError::Transport(_) | HDAccountBalanceRpcError::WalletStorageError(_) | HDAccountBalanceRpcError::RpcInvalidResponse(_) + | HDAccountBalanceRpcError::FailedScripthashSubscription(_) | HDAccountBalanceRpcError::Internal(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/mm2src/coins/rpc_command/init_scan_for_new_addresses.rs b/mm2src/coins/rpc_command/init_scan_for_new_addresses.rs index e90d44eecc..d0aaa8423f 100644 --- a/mm2src/coins/rpc_command/init_scan_for_new_addresses.rs +++ b/mm2src/coins/rpc_command/init_scan_for_new_addresses.rs @@ -130,17 +130,23 @@ pub mod common_impl { use super::*; use crate::coin_balance::HDWalletBalanceOps; use crate::hd_wallet::{HDAccountOps, HDWalletCoinOps, HDWalletOps}; + use crate::utxo::UtxoCommonOps; use crate::CoinWithDerivationMethod; - use std::fmt; + use keys::Address; + use std::collections::HashSet; use std::ops::DerefMut; + use std::str::FromStr; pub async fn scan_for_new_addresses_rpc( coin: &Coin, params: ScanAddressesParams, ) -> MmResult where - Coin: CoinWithDerivationMethod::HDWallet> + HDWalletBalanceOps + Sync, - ::Address: fmt::Display, + Coin: UtxoCommonOps + + CoinWithDerivationMethod::HDWallet> + + HDWalletBalanceOps + + Sync, + HashSet<::Address>: From>, { let hd_wallet = coin.derivation_method().hd_wallet_or_err()?; @@ -157,6 +163,15 @@ pub mod common_impl { .scan_for_new_addresses(hd_wallet, hd_account.deref_mut(), &address_scanner, gap_limit) .await?; + let addresses: HashSet<_> = new_addresses + .iter() + .map(|address_balance| Address::from_str(&address_balance.address).expect("Valid address")) + .collect(); + + coin.prepare_addresses_for_balance_stream_if_enabled(addresses.into()) + .await + .map_err(|e| HDAccountBalanceRpcError::FailedScripthashSubscription(e.to_string()))?; + Ok(ScanAddressesResponse { account_index: account_id, derivation_path: RpcDerivationPath(account_derivation_path), diff --git a/mm2src/coins/tendermint/tendermint_balance_events.rs b/mm2src/coins/tendermint/tendermint_balance_events.rs index 122262eb51..2bbdd59824 100644 --- a/mm2src/coins/tendermint/tendermint_balance_events.rs +++ b/mm2src/coins/tendermint/tendermint_balance_events.rs @@ -114,6 +114,7 @@ impl EventBehaviour for TendermintCoin { }) .collect(); + let mut balance_updates = vec![]; for denom in denoms { if let Some((ticker, decimals)) = self.active_ticker_and_decimals_from_denom(&denom) { let balance_denom = match self.account_balance_for_denom(&self.account_id, denom).await { @@ -139,17 +140,22 @@ impl EventBehaviour for TendermintCoin { } if broadcast { - let payload = json!({ + balance_updates.push(json!({ "ticker": ticker, "balance": { "spendable": balance_decimal, "unspendable": BigDecimal::default() } - }); - - ctx.stream_channel_controller - .broadcast(Event::new(Self::EVENT_NAME.to_string(), payload.to_string())) - .await; + })); } } } + + if !balance_updates.is_empty() { + ctx.stream_channel_controller + .broadcast(Event::new( + Self::EVENT_NAME.to_string(), + json!(balance_updates).to_string(), + )) + .await; + } } } } diff --git a/mm2src/coins/utxo.rs b/mm2src/coins/utxo.rs index 622f2eebb5..bc8620f331 100644 --- a/mm2src/coins/utxo.rs +++ b/mm2src/coins/utxo.rs @@ -32,6 +32,7 @@ pub mod rpc_clients; pub mod slp; pub mod spv; pub mod swap_proto_v2_scripts; +pub mod utxo_balance_events; pub mod utxo_block_header_storage; pub mod utxo_builder; pub mod utxo_common; @@ -55,7 +56,7 @@ use crypto::{Bip32DerPathOps, Bip32Error, Bip44Chain, ChildNumber, DerivationPat StandardHDCoinAddress, StandardHDPathError, StandardHDPathToAccount, StandardHDPathToCoin}; use derive_more::Display; #[cfg(not(target_arch = "wasm32"))] use dirs::home_dir; -use futures::channel::mpsc::{Receiver as AsyncReceiver, Sender as AsyncSender, UnboundedSender}; +use futures::channel::mpsc::{Receiver as AsyncReceiver, Sender as AsyncSender, UnboundedReceiver, UnboundedSender}; use futures::compat::Future01CompatExt; use futures::lock::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard}; use futures01::Future; @@ -65,7 +66,7 @@ pub use keys::{Address, AddressFormat as UtxoAddressFormat, AddressHashEnum, Key Type as ScriptType}; #[cfg(not(target_arch = "wasm32"))] use lightning_invoice::Currency as LightningCurrency; -use mm2_core::mm_ctx::MmArc; +use mm2_core::mm_ctx::{MmArc, MmWeak}; use mm2_err_handle::prelude::*; use mm2_metrics::MetricsArc; use mm2_number::BigDecimal; @@ -142,6 +143,15 @@ pub type MatureUnspentMap = HashMap; pub type RecentlySpentOutPointsGuard<'a> = AsyncMutexGuard<'a, RecentlySpentOutPoints>; pub type UtxoHDAddress = HDAddress; +pub enum ScripthashNotification { + Triggered(String), + SubscribeToAddresses(HashSet
), + RefreshSubscriptions, +} + +pub type ScripthashNotificationSender = Option>; +type ScripthashNotificationHandler = Option>>>; + #[cfg(windows)] #[cfg(not(target_arch = "wasm32"))] fn get_special_folder_path() -> PathBuf { @@ -610,6 +620,11 @@ pub struct UtxoCoinFields { /// This abortable system is used to spawn coin's related futures that should be aborted on coin deactivation /// and on [`MmArc::stop`]. pub abortable_system: AbortableQueue, + pub(crate) ctx: MmWeak, + /// This is used for balance event streaming implementation for UTXOs. + /// If balance event streaming isn't enabled, this value will always be `None`; otherwise, + /// it will be used for receiving scripthash notifications to re-fetch balances. + scripthash_notification_handler: ScripthashNotificationHandler, } #[derive(Debug, Display)] diff --git a/mm2src/coins/utxo/qtum.rs b/mm2src/coins/utxo/qtum.rs index a66ae04331..24861b8b75 100644 --- a/mm2src/coins/utxo/qtum.rs +++ b/mm2src/coins/utxo/qtum.rs @@ -1,3 +1,4 @@ +use super::utxo_common::utxo_prepare_addresses_for_balance_stream_if_enabled; use super::*; use crate::coin_balance::{self, EnableCoinBalanceError, EnabledCoinBalanceParams, HDAccountBalance, HDAddressBalance, HDWalletBalance, HDWalletBalanceOps}; @@ -1149,6 +1150,13 @@ impl HDWalletBalanceOps for QtumCoin { ) -> BalanceResult> { utxo_common::addresses_balances(self, addresses).await } + + async fn prepare_addresses_for_balance_stream_if_enabled( + &self, + addresses: HashSet, + ) -> MmResult<(), String> { + utxo_prepare_addresses_for_balance_stream_if_enabled(self, addresses).await + } } impl HDWalletCoinWithStorageOps for QtumCoin { diff --git a/mm2src/coins/utxo/rpc_clients.rs b/mm2src/coins/utxo/rpc_clients.rs index c4cb848dde..c8772066fe 100644 --- a/mm2src/coins/utxo/rpc_clients.rs +++ b/mm2src/coins/utxo/rpc_clients.rs @@ -3,7 +3,7 @@ use crate::utxo::utxo_block_header_storage::BlockHeaderStorage; use crate::utxo::{output_script, sat_from_big_decimal, GetBlockHeaderError, GetConfirmedTxError, GetTxError, - GetTxHeightError}; + GetTxHeightError, ScripthashNotification}; use crate::{big_decimal_from_sat_unsigned, NumConversError, RpcTransportEventHandler, RpcTransportEventHandlerShared}; use async_trait::async_trait; use chain::{BlockHeader, BlockHeaderBits, BlockHeaderNonce, OutPoint, Transaction as UtxoTx}; @@ -13,7 +13,7 @@ use common::executor::{abortable_queue, abortable_queue::AbortableQueue, Abortab use common::jsonrpc_client::{JsonRpcBatchClient, JsonRpcBatchResponse, JsonRpcClient, JsonRpcError, JsonRpcErrorType, JsonRpcId, JsonRpcMultiClient, JsonRpcRemoteAddr, JsonRpcRequest, JsonRpcRequestEnum, JsonRpcResponse, JsonRpcResponseEnum, JsonRpcResponseFut, RpcRes}; -use common::log::LogOnError; +use common::log::{debug, LogOnError}; use common::log::{error, info, warn}; use common::{median, now_float, now_ms, now_sec, OrdRange}; use derive_more::Display; @@ -52,6 +52,8 @@ use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; use std::sync::Arc; use std::time::Duration; +use super::ScripthashNotificationSender; + cfg_native! { use futures::future::Either; use futures::io::Error; @@ -113,6 +115,15 @@ pub enum UtxoRpcClientEnum { Electrum(ElectrumClient), } +impl ToString for UtxoRpcClientEnum { + fn to_string(&self) -> String { + match self { + UtxoRpcClientEnum::Native(_) => "native".to_owned(), + UtxoRpcClientEnum::Electrum(_) => "electrum".to_owned(), + } + } +} + impl From for UtxoRpcClientEnum { fn from(client: ElectrumClient) -> UtxoRpcClientEnum { UtxoRpcClientEnum::Electrum(client) } } @@ -345,6 +356,8 @@ pub trait UtxoRpcClientOps: fmt::Debug + Send + Sync + 'static { /// Submits the raw `tx` transaction (serialized, hex-encoded) to blockchain network. fn send_raw_transaction(&self, tx: BytesJson) -> UtxoRpcFut; + fn blockchain_scripthash_subscribe(&self, scripthash: String) -> UtxoRpcFut; + /// Returns raw transaction (serialized, hex-encoded) by the given `txid`. fn get_transaction_bytes(&self, txid: &H256Json) -> UtxoRpcFut; @@ -701,12 +714,12 @@ impl JsonRpcClient for NativeClientImpl { .body(Vec::from(request_body)) .map_err(|e| JsonRpcErrorType::InvalidRequest(e.to_string()))); - let event_handles = self.event_handlers.clone(); + let event_handlers = self.event_handlers.clone(); Box::new(slurp_req(http_request).boxed().compat().then( move |result| -> Result<(JsonRpcRemoteAddr, JsonRpcResponseEnum), JsonRpcErrorType> { let res = result.map_err(|e| e.into_inner())?; // measure now only body length, because the `hyper` crate doesn't allow to get total HTTP packet length - event_handles.on_incoming_response(&res.2); + event_handlers.on_incoming_response(&res.2); let body = std::str::from_utf8(&res.2).map_err(|e| JsonRpcErrorType::parse_error(&uri, e.to_string()))?; @@ -806,6 +819,13 @@ impl UtxoRpcClientOps for NativeClient { Box::new(rpc_func!(self, "sendrawtransaction", tx).map_to_mm_fut(UtxoRpcError::from)) } + fn blockchain_scripthash_subscribe(&self, _scripthash: String) -> UtxoRpcFut { + Box::new(futures01::future::err( + UtxoRpcError::Internal("blockchain_scripthash_subscribe` is not supported for Native Clients".to_owned()) + .into(), + )) + } + fn get_transaction_bytes(&self, txid: &H256Json) -> UtxoRpcFut { Box::new(self.get_raw_transaction_bytes(txid).map_to_mm_fut(UtxoRpcError::from)) } @@ -1438,6 +1458,7 @@ fn addr_to_socket_addr(input: &str) -> Result { pub fn spawn_electrum( req: &ElectrumRpcRequest, event_handlers: Vec, + scripthash_notification_sender: &ScripthashNotificationSender, abortable_system: AbortableQueue, ) -> Result { let config = match req.protocol { @@ -1465,6 +1486,7 @@ pub fn spawn_electrum( req.url.clone(), config, event_handlers, + scripthash_notification_sender, abortable_system, )) } @@ -1475,6 +1497,7 @@ pub fn spawn_electrum( pub fn spawn_electrum( req: &ElectrumRpcRequest, event_handlers: Vec, + scripthash_notification_sender: &ScripthashNotificationSender, abortable_system: AbortableQueue, ) -> Result { let mut url = req.url.clone(); @@ -1503,7 +1526,13 @@ pub fn spawn_electrum( }, }; - Ok(electrum_connect(url, config, event_handlers, abortable_system)) + Ok(electrum_connect( + url, + config, + event_handlers, + scripthash_notification_sender, + abortable_system, + )) } /// Represents the active Electrum connection to selected address @@ -1611,6 +1640,10 @@ pub struct ElectrumClientImpl { /// Please also note that this abortable system is a subsystem of [`UtxoCoinFields::abortable_system`]. abortable_system: AbortableQueue, negotiate_version: bool, + /// This is used for balance event streaming implementation for UTXOs. + /// If balance event streaming isn't enabled, this value will always be `None`; otherwise, + /// it will be used for sending scripthash messages to trigger re-connections, re-fetching the balances, etc. + pub(crate) scripthash_notification_sender: ScripthashNotificationSender, } async fn electrum_request_multi( @@ -1700,7 +1733,12 @@ impl ElectrumClientImpl { /// Create an Electrum connection and spawn a green thread actor to handle it. pub async fn add_server(&self, req: &ElectrumRpcRequest) -> Result<(), String> { let subsystem = try_s!(self.abortable_system.create_subsystem()); - let connection = try_s!(spawn_electrum(req, self.event_handlers.clone(), subsystem)); + let connection = try_s!(spawn_electrum( + req, + self.event_handlers.clone(), + &self.scripthash_notification_sender, + subsystem, + )); self.connections.lock().await.push(connection); Ok(()) } @@ -1757,6 +1795,13 @@ impl ElectrumClientImpl { .find(|con| con.addr == server_addr) .ok_or(ERRL!("Unknown electrum address {}", server_addr))?; con.set_protocol_version(version).await; + + if let Some(sender) = &self.scripthash_notification_sender { + sender + .unbounded_send(ScripthashNotification::RefreshSubscriptions) + .map_err(|e| ERRL!("Failed sending scripthash message. {}", e))?; + } + Ok(()) } @@ -1787,6 +1832,8 @@ impl Deref for ElectrumClient { const BLOCKCHAIN_HEADERS_SUB_ID: &str = "blockchain.headers.subscribe"; +const BLOCKCHAIN_SCRIPTHASH_SUB_ID: &str = "blockchain.scripthash.subscribe"; + impl UtxoJsonRpcClientInfo for ElectrumClient { fn coin_name(&self) -> &str { self.coin_ticker.as_str() } } @@ -2236,6 +2283,10 @@ impl UtxoRpcClientOps for ElectrumClient { ) } + fn blockchain_scripthash_subscribe(&self, scripthash: String) -> UtxoRpcFut { + Box::new(rpc_func!(self, BLOCKCHAIN_SCRIPTHASH_SUB_ID, scripthash).map_to_mm_fut(UtxoRpcError::from)) + } + /// https://electrumx.readthedocs.io/en/latest/protocol-methods.html#blockchain-transaction-get /// returns transaction bytes by default fn get_transaction_bytes(&self, txid: &H256Json) -> UtxoRpcFut { @@ -2396,6 +2447,7 @@ impl ElectrumClientImpl { block_headers_storage: BlockHeaderStorage, abortable_system: AbortableQueue, negotiate_version: bool, + scripthash_notification_sender: ScripthashNotificationSender, ) -> ElectrumClientImpl { let protocol_version = OrdRange::new(1.2, 1.4).unwrap(); ElectrumClientImpl { @@ -2409,6 +2461,7 @@ impl ElectrumClientImpl { block_headers_storage, abortable_system, negotiate_version, + scripthash_notification_sender, } } @@ -2419,6 +2472,7 @@ impl ElectrumClientImpl { protocol_version: OrdRange, block_headers_storage: BlockHeaderStorage, abortable_system: AbortableQueue, + scripthash_notification_sender: ScripthashNotificationSender, ) -> ElectrumClientImpl { ElectrumClientImpl { protocol_version, @@ -2428,6 +2482,7 @@ impl ElectrumClientImpl { block_headers_storage, abortable_system, false, + scripthash_notification_sender, ) } } @@ -2438,17 +2493,25 @@ fn rx_to_stream(rx: mpsc::Receiver>) -> impl Stream, Erro rx.map_err(|_| panic!("errors not possible on rx")) } -async fn electrum_process_json(raw_json: Json, arc: &JsonRpcPendingRequestsShared) { +async fn electrum_process_json( + raw_json: Json, + arc: &JsonRpcPendingRequestsShared, + scripthash_notification_sender: &ScripthashNotificationSender, +) { // detect if we got standard JSONRPC response or subscription response as JSONRPC request #[derive(Deserialize)] #[serde(untagged)] enum ElectrumRpcResponseEnum { + /// The subscription response as JSONRPC request. + /// + /// NOTE Because JsonRpcResponse uses default values for each of its field, + /// this variant has to stay at top in this enumeration to be properly deserialized + /// from serde. + SubscriptionNotification(JsonRpcRequest), /// The standard JSONRPC single response. SingleResponse(JsonRpcResponse), /// The batch of standard JSONRPC responses. BatchResponses(JsonRpcBatchResponse), - /// The subscription response as JSONRPC request. - SubscriptionNotification(JsonRpcRequest), } let response: ElectrumRpcResponseEnum = match json::from_value(raw_json) { @@ -2465,6 +2528,25 @@ async fn electrum_process_json(raw_json: Json, arc: &JsonRpcPendingRequestsShare ElectrumRpcResponseEnum::SubscriptionNotification(req) => { let id = match req.method.as_ref() { BLOCKCHAIN_HEADERS_SUB_ID => BLOCKCHAIN_HEADERS_SUB_ID, + BLOCKCHAIN_SCRIPTHASH_SUB_ID => { + let scripthash = match req.params.first() { + Some(t) => t.as_str().unwrap_or_default(), + None => { + debug!("Notification must contain the scripthash value."); + return; + }, + }; + + if let Some(sender) = scripthash_notification_sender { + debug!("Sending scripthash message"); + if let Err(e) = sender.unbounded_send(ScripthashNotification::Triggered(scripthash.to_string())) + { + error!("Failed sending scripthash message. {e}"); + return; + }; + }; + BLOCKCHAIN_SCRIPTHASH_SUB_ID + }, _ => { error!("Couldn't get id of request {:?}", req); return; @@ -2487,7 +2569,11 @@ async fn electrum_process_json(raw_json: Json, arc: &JsonRpcPendingRequestsShare } } -async fn electrum_process_chunk(chunk: &[u8], arc: &JsonRpcPendingRequestsShared) { +async fn electrum_process_chunk( + chunk: &[u8], + arc: &JsonRpcPendingRequestsShared, + scripthash_notification_sender: ScripthashNotificationSender, +) { // we should split the received chunk because we can get several responses in 1 chunk. let split = chunk.split(|item| *item == b'\n'); for chunk in split { @@ -2500,7 +2586,7 @@ async fn electrum_process_chunk(chunk: &[u8], arc: &JsonRpcPendingRequestsShared return; }, }; - electrum_process_json(raw_json, arc).await + electrum_process_json(raw_json, arc, &scripthash_notification_sender).await } } } @@ -2629,6 +2715,7 @@ async fn connect_loop( responses: JsonRpcPendingRequestsShared, connection_tx: Arc>>>>, event_handlers: Vec, + scripthash_notification_sender: ScripthashNotificationSender, _spawner: Spawner, ) -> Result<(), ()> { let delay = Arc::new(AtomicU64::new(0)); @@ -2682,6 +2769,7 @@ async fn connect_loop( let delay = delay.clone(); let addr = addr.clone(); let responses = responses.clone(); + let scripthash_notification_sender = scripthash_notification_sender.clone(); let event_handlers = event_handlers.clone(); async move { let mut buffer = String::with_capacity(1024); @@ -2705,7 +2793,7 @@ async fn connect_loop( event_handlers.on_incoming_response(buffer.as_bytes()); last_chunk.store(now_ms(), AtomicOrdering::Relaxed); - electrum_process_chunk(buffer.as_bytes(), &responses).await; + electrum_process_chunk(buffer.as_bytes(), &responses, scripthash_notification_sender.clone()).await; buffer.clear(); } } @@ -2749,6 +2837,7 @@ async fn connect_loop( responses: JsonRpcPendingRequestsShared, connection_tx: Arc>>>>, event_handlers: Vec, + scripthash_notification_sender: ScripthashNotificationSender, spawner: Spawner, ) -> Result<(), ()> { use std::sync::atomic::AtomicUsize; @@ -2783,6 +2872,7 @@ async fn connect_loop( let delay = delay.clone(); let addr = addr.clone(); let responses = responses.clone(); + let scripthash_notification_sender = scripthash_notification_sender.clone(); let event_handlers = event_handlers.clone(); async move { while let Some(incoming_res) = transport_rx.next().await { @@ -2795,7 +2885,7 @@ async fn connect_loop( let incoming_str = incoming_json.to_string(); event_handlers.on_incoming_response(incoming_str.as_bytes()); - electrum_process_json(incoming_json, &responses).await; + electrum_process_json(incoming_json, &responses, &scripthash_notification_sender).await; }, Err(e) => { error!("{} error: {:?}", addr, e); @@ -2855,6 +2945,7 @@ fn electrum_connect( addr: String, config: ElectrumConfig, event_handlers: Vec, + scripthash_notification_sender: &ScripthashNotificationSender, abortable_system: AbortableQueue, ) -> ElectrumConnection { let responses = Arc::new(AsyncMutex::new(JsonRpcPendingRequests::default())); @@ -2867,6 +2958,7 @@ fn electrum_connect( responses.clone(), tx.clone(), event_handlers, + scripthash_notification_sender.clone(), spawner.clone(), ) .then(|_| futures::future::ready(())); diff --git a/mm2src/coins/utxo/utxo_balance_events.rs b/mm2src/coins/utxo/utxo_balance_events.rs new file mode 100644 index 0000000000..7ff6957c3b --- /dev/null +++ b/mm2src/coins/utxo/utxo_balance_events.rs @@ -0,0 +1,195 @@ +use async_trait::async_trait; +use common::{executor::{AbortSettings, SpawnAbortable, Timer}, + log, Future01CompatExt}; +use futures::channel::oneshot::{self, Receiver, Sender}; +use futures_util::StreamExt; +use keys::Address; +use mm2_core::mm_ctx::MmArc; +use mm2_event_stream::{behaviour::{EventBehaviour, EventInitStatus}, + Event, EventStreamConfiguration}; +use std::collections::{BTreeMap, HashSet}; + +use super::utxo_standard::UtxoStandardCoin; +use crate::{utxo::{output_script, + rpc_clients::electrum_script_hash, + utxo_common::{address_balance, address_to_scripthash}, + utxo_tx_history_v2::UtxoTxHistoryOps, + ScripthashNotification, UtxoCoinFields}, + MarketCoinOps, MmCoin}; + +macro_rules! try_or_continue { + ($exp:expr) => { + match $exp { + Ok(t) => t, + Err(e) => { + log::error!("{}", e); + continue; + }, + } + }; +} + +#[async_trait] +impl EventBehaviour for UtxoStandardCoin { + const EVENT_NAME: &'static str = "COIN_BALANCE"; + + // TODO: On certain errors, send an error event to clients (e.g., when not being able to read the + // balance or not being able to subscribe to scripthash/address.). + async fn handle(self, _interval: f64, tx: oneshot::Sender) { + const RECEIVER_DROPPED_MSG: &str = "Receiver is dropped, which should never happen."; + + async fn subscribe_to_addresses( + utxo: &UtxoCoinFields, + addresses: HashSet
, + ) -> Result, String> { + const LOOP_INTERVAL: f64 = 0.5; + + let mut scripthash_to_address_map: BTreeMap = BTreeMap::new(); + for address in addresses { + let scripthash = address_to_scripthash(&address); + + scripthash_to_address_map.insert(scripthash.clone(), address); + + let mut attempt = 0; + while let Err(e) = utxo + .rpc_client + .blockchain_scripthash_subscribe(scripthash.clone()) + .compat() + .await + { + if attempt == 5 { + return Err(e.to_string()); + } + + log::error!( + "Failed to subscribe {} scripthash ({attempt}/5 attempt). Error: {}", + scripthash, + e.to_string() + ); + + attempt += 1; + Timer::sleep(LOOP_INTERVAL).await; + } + } + + Ok(scripthash_to_address_map) + } + + let ctx = match MmArc::from_weak(&self.as_ref().ctx) { + Some(ctx) => ctx, + None => { + let msg = "MM context must have been initialized already."; + tx.send(EventInitStatus::Failed(msg.to_owned())) + .expect(RECEIVER_DROPPED_MSG); + panic!("{}", msg); + }, + }; + + let scripthash_notification_handler = match self.as_ref().scripthash_notification_handler.as_ref() { + Some(t) => t, + None => { + let e = "Scripthash notification receiver can not be empty."; + tx.send(EventInitStatus::Failed(e.to_string())) + .expect(RECEIVER_DROPPED_MSG); + panic!("{}", e); + }, + }; + + tx.send(EventInitStatus::Success).expect(RECEIVER_DROPPED_MSG); + + let mut scripthash_to_address_map = BTreeMap::default(); + while let Some(message) = scripthash_notification_handler.lock().await.next().await { + let notified_scripthash = match message { + ScripthashNotification::Triggered(t) => t, + ScripthashNotification::SubscribeToAddresses(addresses) => { + match subscribe_to_addresses(self.as_ref(), addresses).await { + Ok(map) => scripthash_to_address_map.extend(map), + Err(e) => { + log::error!("{e}"); + }, + }; + + continue; + }, + ScripthashNotification::RefreshSubscriptions => { + let my_addresses = try_or_continue!(self.my_addresses().await); + match subscribe_to_addresses(self.as_ref(), my_addresses).await { + Ok(map) => scripthash_to_address_map = map, + Err(e) => { + log::error!("{e}"); + }, + }; + + continue; + }, + }; + + let address = match scripthash_to_address_map.get(¬ified_scripthash) { + Some(t) => Some(t.clone()), + None => try_or_continue!(self.my_addresses().await) + .into_iter() + .find_map(|addr| { + let script = output_script(&addr, keys::Type::P2PKH); + let script_hash = electrum_script_hash(&script); + let scripthash = hex::encode(script_hash); + + if notified_scripthash == scripthash { + scripthash_to_address_map.insert(notified_scripthash.clone(), addr.clone()); + Some(addr) + } else { + None + } + }), + }; + + let address = match address { + Some(t) => t, + None => { + log::debug!( + "Couldn't find the relevant address for {} scripthash.", + notified_scripthash + ); + continue; + }, + }; + + let balance = try_or_continue!(address_balance(&self, &address).await); + + let payload = json!({ + "ticker": self.ticker(), + "address": address.to_string(), + "balance": { "spendable": balance.spendable, "unspendable": balance.unspendable } + }); + + ctx.stream_channel_controller + .broadcast(Event::new( + Self::EVENT_NAME.to_string(), + json!(vec![payload]).to_string(), + )) + .await; + } + } + + async fn spawn_if_active(self, config: &EventStreamConfiguration) -> EventInitStatus { + if let Some(event) = config.get_event(Self::EVENT_NAME) { + log::info!( + "{} event is activated for {}. `stream_interval_seconds`({}) has no effect on this.", + Self::EVENT_NAME, + self.ticker(), + event.stream_interval_seconds + ); + + let (tx, rx): (Sender, Receiver) = oneshot::channel(); + let fut = self.clone().handle(event.stream_interval_seconds, tx); + let settings = + AbortSettings::info_on_abort(format!("{} event is stopped for {}.", Self::EVENT_NAME, self.ticker())); + self.spawner().spawn_with_settings(fut, settings); + + rx.await.unwrap_or_else(|e| { + EventInitStatus::Failed(format!("Event initialization status must be received: {}", e)) + }) + } else { + EventInitStatus::Inactive + } + } +} diff --git a/mm2src/coins/utxo/utxo_builder/utxo_arc_builder.rs b/mm2src/coins/utxo/utxo_builder/utxo_arc_builder.rs index b97c1aa94e..3c1fc84e9c 100644 --- a/mm2src/coins/utxo/utxo_builder/utxo_arc_builder.rs +++ b/mm2src/coins/utxo/utxo_builder/utxo_arc_builder.rs @@ -3,6 +3,7 @@ use crate::utxo::utxo_block_header_storage::BlockHeaderStorage; use crate::utxo::utxo_builder::{UtxoCoinBuildError, UtxoCoinBuilder, UtxoCoinBuilderCommonOps, UtxoFieldsWithGlobalHDBuilder, UtxoFieldsWithHardwareWalletBuilder, UtxoFieldsWithIguanaSecretBuilder}; +use crate::utxo::utxo_standard::UtxoStandardCoin; use crate::utxo::{generate_and_send_tx, FeePolicy, GetUtxoListOps, UtxoArc, UtxoCommonOps, UtxoSyncStatusLoopHandle, UtxoWeak}; use crate::{DerivationMethod, PrivKeyBuildPolicy, UtxoActivationParams}; @@ -13,6 +14,7 @@ use common::log::{debug, error, info, warn}; use futures::compat::Future01CompatExt; use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::*; +use mm2_event_stream::behaviour::{EventBehaviour, EventInitStatus}; #[cfg(test)] use mocktopus::macros::*; use rand::Rng; use script::Builder; @@ -107,6 +109,7 @@ where let utxo = self.build_utxo_fields().await?; let sync_status_loop_handle = utxo.block_headers_status_notifier.clone(); let spv_conf = utxo.conf.spv_conf.clone(); + let (is_native_mode, mode) = (utxo.rpc_client.is_native(), utxo.rpc_client.to_string()); let utxo_arc = UtxoArc::new(utxo); self.spawn_merge_utxo_loop_if_required(&utxo_arc, self.constructor.clone()); @@ -118,6 +121,18 @@ where spawn_block_header_utxo_loop(self.ticker, &utxo_arc, sync_handle, spv_conf); } + if let Some(stream_config) = &self.ctx().event_stream_configuration { + if is_native_mode { + return MmError::err(UtxoCoinBuildError::UnsupportedModeForBalanceEvents { mode }); + } + + if let EventInitStatus::Failed(err) = + EventBehaviour::spawn_if_active(UtxoStandardCoin::from(utxo_arc), stream_config).await + { + return MmError::err(UtxoCoinBuildError::FailedSpawningBalanceEvents(err)); + } + } + Ok(result_coin) } } diff --git a/mm2src/coins/utxo/utxo_builder/utxo_coin_builder.rs b/mm2src/coins/utxo/utxo_builder/utxo_coin_builder.rs index 16f778d5ff..b3c2d19680 100644 --- a/mm2src/coins/utxo/utxo_builder/utxo_coin_builder.rs +++ b/mm2src/coins/utxo/utxo_builder/utxo_coin_builder.rs @@ -6,8 +6,9 @@ use crate::utxo::tx_cache::{UtxoVerboseCacheOps, UtxoVerboseCacheShared}; use crate::utxo::utxo_block_header_storage::BlockHeaderStorage; use crate::utxo::utxo_builder::utxo_conf_builder::{UtxoConfBuilder, UtxoConfError}; use crate::utxo::{output_script, utxo_common, ElectrumBuilderArgs, ElectrumProtoVerifier, ElectrumProtoVerifierEvent, - RecentlySpentOutPoints, TxFee, UtxoCoinConf, UtxoCoinFields, UtxoHDAccount, UtxoHDWallet, - UtxoRpcMode, UtxoSyncStatus, UtxoSyncStatusLoopHandle, DEFAULT_GAP_LIMIT, UTXO_DUST_AMOUNT}; + RecentlySpentOutPoints, ScripthashNotification, ScripthashNotificationSender, TxFee, UtxoCoinConf, + UtxoCoinFields, UtxoHDAccount, UtxoHDWallet, UtxoRpcMode, UtxoSyncStatus, UtxoSyncStatusLoopHandle, + DEFAULT_GAP_LIMIT, UTXO_DUST_AMOUNT}; use crate::{BlockchainNetwork, CoinTransportMetrics, DerivationMethod, HistorySyncState, IguanaPrivKey, PrivKeyBuildPolicy, PrivKeyPolicy, PrivKeyPolicyNotAllowed, RpcClientType, UtxoActivationParams}; use async_trait::async_trait; @@ -20,7 +21,7 @@ use common::{now_sec, small_rng}; use crypto::{Bip32DerPathError, CryptoCtx, CryptoCtxError, GlobalHDAccountArc, HwWalletType, StandardHDPathError, StandardHDPathToCoin}; use derive_more::Display; -use futures::channel::mpsc::{channel, unbounded, Receiver as AsyncReceiver, UnboundedReceiver}; +use futures::channel::mpsc::{channel, unbounded, Receiver as AsyncReceiver, UnboundedReceiver, UnboundedSender}; use futures::compat::Future01CompatExt; use futures::lock::Mutex as AsyncMutex; use futures::StreamExt; @@ -89,6 +90,12 @@ pub enum UtxoCoinBuildError { #[display(fmt = "SPV params verificaiton failed. Error: {_0}")] SPVError(SPVError), ErrorCalculatingStartingHeight(String), + #[display(fmt = "Failed spawning balance events. Error: {_0}")] + FailedSpawningBalanceEvents(String), + #[display(fmt = "Can not enable balance events for {} mode.", mode)] + UnsupportedModeForBalanceEvents { + mode: String, + }, } impl From for UtxoCoinBuildError { @@ -193,6 +200,25 @@ pub trait UtxoFieldsWithGlobalHDBuilder: UtxoCoinBuilderCommonOps { } } +// The return type is one-time used only. No need to create a type for it. +#[allow(clippy::type_complexity)] +fn get_scripthash_notification_handlers( + ctx: &MmArc, +) -> Option<( + UnboundedSender, + Arc>>, +)> { + if ctx.event_stream_configuration.is_some() { + let (sender, receiver): ( + UnboundedSender, + UnboundedReceiver, + ) = futures::channel::mpsc::unbounded(); + Some((sender, Arc::new(AsyncMutex::new(receiver)))) + } else { + None + } +} + async fn build_utxo_coin_fields_with_conf_and_policy( builder: &Builder, conf: UtxoCoinConf, @@ -215,11 +241,19 @@ where let my_script_pubkey = output_script(&my_address, ScriptType::P2PKH).to_bytes(); let derivation_method = DerivationMethod::SingleAddress(my_address); + let (scripthash_notification_sender, scripthash_notification_handler) = + match get_scripthash_notification_handlers(builder.ctx()) { + Some((sender, receiver)) => (Some(sender), Some(receiver)), + None => (None, None), + }; + // Create an abortable system linked to the `MmCtx` so if the context is stopped via `MmArc::stop`, // all spawned futures related to this `UTXO` coin will be aborted as well. let abortable_system: AbortableQueue = builder.ctx().abortable_system.create_subsystem()?; - let rpc_client = builder.rpc_client(abortable_system.create_subsystem()?).await?; + let rpc_client = builder + .rpc_client(scripthash_notification_sender, abortable_system.create_subsystem()?) + .await?; let tx_fee = builder.tx_fee(&rpc_client).await?; let decimals = builder.decimals(&rpc_client).await?; let dust_amount = builder.dust_amount(); @@ -247,7 +281,10 @@ where block_headers_status_notifier, block_headers_status_watcher, abortable_system, + scripthash_notification_handler, + ctx: builder.ctx().weak(), }; + Ok(coin) } @@ -288,11 +325,19 @@ pub trait UtxoFieldsWithHardwareWalletBuilder: UtxoCoinBuilderCommonOps { gap_limit, }; + let (scripthash_notification_sender, scripthash_notification_handler) = + match get_scripthash_notification_handlers(self.ctx()) { + Some((sender, receiver)) => (Some(sender), Some(receiver)), + None => (None, None), + }; + // Create an abortable system linked to the `MmCtx` so if the context is stopped via `MmArc::stop`, // all spawned futures related to this `UTXO` coin will be aborted as well. let abortable_system: AbortableQueue = self.ctx().abortable_system.create_subsystem()?; - let rpc_client = self.rpc_client(abortable_system.create_subsystem()?).await?; + let rpc_client = self + .rpc_client(scripthash_notification_sender, abortable_system.create_subsystem()?) + .await?; let tx_fee = self.tx_fee(&rpc_client).await?; let decimals = self.decimals(&rpc_client).await?; let dust_amount = self.dust_amount(); @@ -320,6 +365,8 @@ pub trait UtxoFieldsWithHardwareWalletBuilder: UtxoCoinBuilderCommonOps { block_headers_status_notifier, block_headers_status_watcher, abortable_system, + scripthash_notification_handler, + ctx: self.ctx().weak(), }; Ok(coin) } @@ -464,7 +511,11 @@ pub trait UtxoCoinBuilderCommonOps { } } - async fn rpc_client(&self, abortable_system: AbortableQueue) -> UtxoCoinBuildResult { + async fn rpc_client( + &self, + scripthash_notification_sender: ScripthashNotificationSender, + abortable_system: AbortableQueue, + ) -> UtxoCoinBuildResult { match self.activation_params().mode.clone() { UtxoRpcMode::Native => { #[cfg(target_arch = "wasm32")] @@ -479,7 +530,12 @@ pub trait UtxoCoinBuilderCommonOps { }, UtxoRpcMode::Electrum { servers } => { let electrum = self - .electrum_client(abortable_system, ElectrumBuilderArgs::default(), servers) + .electrum_client( + abortable_system, + ElectrumBuilderArgs::default(), + servers, + scripthash_notification_sender, + ) .await?; Ok(UtxoRpcClientEnum::Electrum(electrum)) }, @@ -493,6 +549,7 @@ pub trait UtxoCoinBuilderCommonOps { abortable_system: AbortableQueue, args: ElectrumBuilderArgs, mut servers: Vec, + scripthash_notification_sender: ScripthashNotificationSender, ) -> UtxoCoinBuildResult { let (on_event_tx, on_event_rx) = unbounded(); let ticker = self.ticker().to_owned(); @@ -524,6 +581,7 @@ pub trait UtxoCoinBuilderCommonOps { block_headers_storage, abortable_system, args.negotiate_version, + scripthash_notification_sender, ); for server in servers.iter() { match client.add_server(server).await { diff --git a/mm2src/coins/utxo/utxo_common.rs b/mm2src/coins/utxo/utxo_common.rs index 2354d8ba44..124fa0d4d1 100644 --- a/mm2src/coins/utxo/utxo_common.rs +++ b/mm2src/coins/utxo/utxo_common.rs @@ -5048,6 +5048,30 @@ where refund_htlc_payment(coin, args, SwapPaymentType::TakerPaymentV2).await } +pub fn address_to_scripthash(address: &Address) -> String { + let script = output_script(address, keys::Type::P2PKH); + let script_hash = electrum_script_hash(&script); + hex::encode(script_hash) +} + +pub async fn utxo_prepare_addresses_for_balance_stream_if_enabled( + coin: &T, + addresses: HashSet
, +) -> MmResult<(), String> +where + T: UtxoCommonOps, +{ + if let UtxoRpcClientEnum::Electrum(electrum_client) = &coin.as_ref().rpc_client { + if let Some(sender) = &electrum_client.scripthash_notification_sender { + sender + .unbounded_send(ScripthashNotification::SubscribeToAddresses(addresses)) + .map_err(|e| ERRL!("Failed sending scripthash message. {}", e))?; + } + }; + + Ok(()) +} + #[test] fn test_increase_by_percent() { assert_eq!(increase_by_percent(4300, 1.), 4343); @@ -5148,3 +5172,21 @@ fn test_generate_taker_fee_tx_outputs_with_burn() { assert_eq!(outputs[1].value, burn_uamount); } + +#[test] +fn test_address_to_scripthash() { + let address = Address::from("RMGJ9tRST45RnwEKHPGgBLuY3moSYP7Mhk"); + let actual = address_to_scripthash(&address); + let expected = "e850499408c6ebcf6b3340282747e540fb23748429fca5f2b36cdeef54ddf5b1".to_owned(); + assert_eq!(expected, actual); + + let address = Address::from("R9o9xTocqr6CeEDGDH6mEYpwLoMz6jNjMW"); + let actual = address_to_scripthash(&address); + let expected = "a70a7a7041ef172ce4b5f8208aabed44c81e2af75493540f50af7bd9afa9955d".to_owned(); + assert_eq!(expected, actual); + + let address = Address::from("qcyBHeSct7Wr4mAw18iuQ1zW5mMFYmtmBE"); + let actual = address_to_scripthash(&address); + let expected = "c5b5922c86830289231539d1681d8ce621aac8326c96d6ac55400b4d1485f769".to_owned(); + assert_eq!(expected, actual); +} diff --git a/mm2src/coins/utxo/utxo_common_tests.rs b/mm2src/coins/utxo/utxo_common_tests.rs index 00410001e3..f4d9adc2c6 100644 --- a/mm2src/coins/utxo/utxo_common_tests.rs +++ b/mm2src/coins/utxo/utxo_common_tests.rs @@ -138,6 +138,8 @@ pub(super) fn utxo_coin_fields_for_test( block_headers_status_notifier: None, block_headers_status_watcher: None, abortable_system: AbortableQueue::default(), + scripthash_notification_handler: None, + ctx: Default::default(), } } diff --git a/mm2src/coins/utxo/utxo_standard.rs b/mm2src/coins/utxo/utxo_standard.rs index ab809ccfc5..d176d969dd 100644 --- a/mm2src/coins/utxo/utxo_standard.rs +++ b/mm2src/coins/utxo/utxo_standard.rs @@ -1,3 +1,4 @@ +use super::utxo_common::utxo_prepare_addresses_for_balance_stream_if_enabled; use super::*; use crate::coin_balance::{self, EnableCoinBalanceError, EnabledCoinBalanceParams, HDAccountBalance, HDAddressBalance, HDWalletBalance, HDWalletBalanceOps}; @@ -78,6 +79,7 @@ pub async fn utxo_standard_coin_with_policy( .build() .await ); + Ok(coin) } @@ -1035,6 +1037,13 @@ impl HDWalletBalanceOps for UtxoStandardCoin { ) -> BalanceResult> { utxo_common::addresses_balances(self, addresses).await } + + async fn prepare_addresses_for_balance_stream_if_enabled( + &self, + addresses: HashSet, + ) -> MmResult<(), String> { + utxo_prepare_addresses_for_balance_stream_if_enabled(self, addresses).await + } } impl HDWalletCoinWithStorageOps for UtxoStandardCoin { diff --git a/mm2src/coins/utxo/utxo_tests.rs b/mm2src/coins/utxo/utxo_tests.rs index 88d60142bd..de8bc91e31 100644 --- a/mm2src/coins/utxo/utxo_tests.rs +++ b/mm2src/coins/utxo/utxo_tests.rs @@ -84,7 +84,7 @@ pub fn electrum_client_for_test(servers: &[&str]) -> ElectrumClient { let servers = servers.into_iter().map(|s| json::from_value(s).unwrap()).collect(); let abortable_system = AbortableQueue::default(); - block_on(builder.electrum_client(abortable_system, args, servers)).unwrap() + block_on(builder.electrum_client(abortable_system, args, servers, None)).unwrap() } /// Returned client won't work by default, requires some mocks to be usable @@ -469,6 +469,7 @@ fn test_wait_for_payment_spend_timeout_electrum() { block_headers_storage, abortable_system, true, + None, ); let client = UtxoRpcClientEnum::Electrum(ElectrumClient(Arc::new(client))); let coin = utxo_coin_for_test(client, None, false); @@ -1482,13 +1483,14 @@ fn test_network_info_negative_time_offset() { #[test] fn test_unavailable_electrum_proto_version() { ElectrumClientImpl::new.mock_safe( - |coin_ticker, event_handlers, block_headers_storage, abortable_system, _| { + |coin_ticker, event_handlers, block_headers_storage, abortable_system, _, _| { MockResult::Return(ElectrumClientImpl::with_protocol_version( coin_ticker, event_handlers, OrdRange::new(1.8, 1.9).unwrap(), block_headers_storage, abortable_system, + None, )) }, ); @@ -3781,6 +3783,8 @@ fn test_scan_for_new_addresses() { let client = NativeClient(Arc::new(NativeClientImpl::default())); let mut fields = utxo_coin_fields_for_test(UtxoRpcClientEnum::Native(client), None, false); + let ctx = MmCtxBuilder::new().into_mm_arc(); + fields.ctx = ctx.weak(); let mut hd_accounts = HDAccountsMap::new(); hd_accounts.insert(0, UtxoHDAccount { account_id: 0, @@ -3920,6 +3924,8 @@ fn test_get_new_address() { let client = NativeClient(Arc::new(NativeClientImpl::default())); let mut fields = utxo_coin_fields_for_test(UtxoRpcClientEnum::Native(client), None, false); + let ctx = MmCtxBuilder::new().into_mm_arc(); + fields.ctx = ctx.weak(); let mut hd_accounts = HDAccountsMap::new(); let hd_account_for_test = UtxoHDAccount { account_id: 0, diff --git a/mm2src/coins/utxo/utxo_wasm_tests.rs b/mm2src/coins/utxo/utxo_wasm_tests.rs index 35902f4ff0..9c11359d73 100644 --- a/mm2src/coins/utxo/utxo_wasm_tests.rs +++ b/mm2src/coins/utxo/utxo_wasm_tests.rs @@ -41,7 +41,10 @@ pub async fn electrum_client_for_test(servers: &[&str]) -> ElectrumClient { let servers = servers.into_iter().map(|s| json::from_value(s).unwrap()).collect(); let abortable_system = AbortableQueue::default(); - builder.electrum_client(abortable_system, args, servers).await.unwrap() + builder + .electrum_client(abortable_system, args, servers, None) + .await + .unwrap() } #[wasm_bindgen_test]