diff --git a/Cargo.lock b/Cargo.lock index 14af4d5dc6..0d94db7649 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1003,6 +1003,7 @@ dependencies = [ "ethkey", "futures 0.1.29", "futures 0.3.28", + "futures-ticker", "futures-util", "group 0.8.0", "gstuff", @@ -1161,6 +1162,7 @@ dependencies = [ "primitive-types", "rand 0.7.3", "regex", + "rustc-hash", "ser_error", "ser_error_derive", "serde", diff --git a/mm2src/coins/Cargo.toml b/mm2src/coins/Cargo.toml index 4ce5597b4f..d957357ecb 100644 --- a/mm2src/coins/Cargo.toml +++ b/mm2src/coins/Cargo.toml @@ -49,9 +49,9 @@ ethereum-types = { version = "0.13", default-features = false, features = ["std" ethkey = { git = "https://github.com/KomodoPlatform/mm2-parity-ethereum.git" } # Waiting for https://github.com/rust-lang/rust/issues/54725 to use on Stable. #enum_dispatch = "0.1" -tokio-tungstenite-wasm = { git = "https://github.com/KomodoPlatform/tokio-tungstenite-wasm", rev = "d20abdb", features = ["rustls-tls-native-roots"]} futures01 = { version = "0.1", package = "futures" } futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] } +futures-ticker = "0.0.3" # using select macro requires the crate to be named futures, compilation failed with futures03 name futures = { version = "0.3", package = "futures", features = ["compat", "async-await"] } group = "0.8.0" @@ -100,6 +100,7 @@ sha3 = "0.9" utxo_signer = { path = "utxo_signer" } # using the same version as cosmrs tendermint-rpc = { version = "=0.23.7", default-features = false } +tokio-tungstenite-wasm = { git = "https://github.com/KomodoPlatform/tokio-tungstenite-wasm", rev = "d20abdb", features = ["rustls-tls-native-roots"]} tiny-bip39 = "0.8.0" url = { version = "2.2.2", features = ["serde"] } uuid = { version = "1.2.2", features = ["fast-rng", "serde", "v4"] } diff --git a/mm2src/coins/eth.rs b/mm2src/coins/eth.rs index 8a16949695..27f1c10c3f 100644 --- a/mm2src/coins/eth.rs +++ b/mm2src/coins/eth.rs @@ -21,14 +21,17 @@ // Copyright © 2023 Pampex LTD and TillyHK LTD. All rights reserved. // use super::eth::Action::{Call, Create}; +use crate::eth::eth_rpc::ETH_RPC_REQUEST_TIMEOUT; +use crate::eth::web3_transport::websocket_transport::{WebsocketTransport, WebsocketTransportNode}; use crate::lp_price::get_base_price_in_rel; use crate::nft::nft_structs::{ContractType, ConvertChain, TransactionNftDetails, WithdrawErc1155, WithdrawErc721}; -use crate::{DexFee, ValidateWatcherSpendInput, WatcherSpendType}; +use crate::{DexFee, RpcCommonOps, ValidateWatcherSpendInput, WatcherSpendType}; use async_trait::async_trait; use bitcrypto::{dhash160, keccak256, ripemd160, sha256}; use common::custom_futures::repeatable::{Ready, Retry, RetryOnError}; use common::custom_futures::timeout::FutureTimerExt; use common::executor::{abortable_queue::AbortableQueue, AbortableSystem, AbortedError, Timer}; +use common::executor::{AbortSettings, SpawnAbortable}; use common::log::{debug, error, info, warn}; use common::number_type_casting::SafeTypeCastingNumbers; use common::{get_utc_timestamp, now_sec, small_rng, DEX_FEE_ADDR_RAW_PUBKEY}; @@ -47,7 +50,8 @@ use ethkey::{sign, verify_address}; use futures::compat::Future01CompatExt; use futures::future::{join_all, select_ok, try_join_all, Either, FutureExt, TryFutureExt}; use futures01::Future; -use http::StatusCode; +use http::{StatusCode, Uri}; +use instant::Instant; use mm2_core::mm_ctx::{MmArc, MmWeak}; use mm2_err_handle::prelude::*; use mm2_event_stream::behaviour::{EventBehaviour, EventInitStatus}; @@ -69,10 +73,11 @@ use std::ops::Deref; use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; use std::sync::{Arc, Mutex}; +use std::time::Duration; use web3::types::{Action as TraceAction, BlockId, BlockNumber, Bytes, CallRequest, FilterBuilder, Log, Trace, TraceFilterBuilder, Transaction as Web3Transaction, TransactionId, U64}; use web3::{self, Web3}; -use web3_transport::{http_transport::HttpTransportNode, EthFeeHistoryNamespace, Web3Transport}; +use web3_transport::{http_transport::HttpTransportNode, Web3Transport}; cfg_wasm32! { use crypto::MetamaskArc; @@ -104,6 +109,7 @@ use super::{coin_conf, lp_coinfind_or_err, AsyncMutex, BalanceError, BalanceFut, pub use rlp; mod eth_balance_events; +mod eth_rpc; #[cfg(test)] mod eth_tests; #[cfg(target_arch = "wasm32")] mod eth_wasm_tests; mod web3_transport; @@ -426,9 +432,7 @@ pub struct EthCoinImpl { swap_contract_address: Address, fallback_swap_contract: Option
, contract_supports_watchers: bool, - pub(crate) web3: Web3, - /// The separate web3 instances kept to get nonce, will replace the web3 completely soon - web3_instances: Vec, + web3_instances: AsyncMutex>, decimals: u8, gas_station_url: Option, gas_station_decimals: u8, @@ -487,67 +491,6 @@ async fn make_gas_station_request(url: &str) -> GasStationResult { } impl EthCoinImpl { - /// Gets Transfer events from ERC20 smart contract `addr` between `from_block` and `to_block` - fn erc20_transfer_events( - &self, - contract: Address, - from_addr: Option
, - to_addr: Option
, - from_block: BlockNumber, - to_block: BlockNumber, - limit: Option, - ) -> Box, Error = String> + Send> { - let contract_event = try_fus!(ERC20_CONTRACT.event("Transfer")); - let topic0 = Some(vec![contract_event.signature()]); - let topic1 = from_addr.map(|addr| vec![addr.into()]); - let topic2 = to_addr.map(|addr| vec![addr.into()]); - let mut filter = FilterBuilder::default() - .topics(topic0, topic1, topic2, None) - .from_block(from_block) - .to_block(to_block) - .address(vec![contract]); - - if let Some(l) = limit { - filter = filter.limit(l); - } - - Box::new( - self.web3 - .eth() - .logs(filter.build()) - .compat() - .map_err(|e| ERRL!("{}", e)), - ) - } - - /// Gets ETH traces from ETH node between addresses in `from_block` and `to_block` - fn eth_traces( - &self, - from_addr: Vec
, - to_addr: Vec
, - from_block: BlockNumber, - to_block: BlockNumber, - limit: Option, - ) -> Box, Error = String> + Send> { - let mut filter = TraceFilterBuilder::default() - .from_address(from_addr) - .to_address(to_addr) - .from_block(from_block) - .to_block(to_block); - - if let Some(l) = limit { - filter = filter.count(l); - } - - Box::new( - self.web3 - .trace() - .filter(filter.build()) - .compat() - .map_err(|e| ERRL!("{}", e)), - ) - } - #[cfg(not(target_arch = "wasm32"))] fn eth_traces_path(&self, ctx: &MmArc) -> PathBuf { ctx.dbdir() @@ -644,24 +587,6 @@ impl EthCoinImpl { sha256(&input).to_vec() } - /// Gets `SenderRefunded` events from etomic swap smart contract since `from_block` - fn refund_events( - &self, - swap_contract_address: Address, - from_block: u64, - to_block: u64, - ) -> Box, Error = String> + Send> { - let contract_event = try_fus!(SWAP_CONTRACT.event("SenderRefunded")); - let filter = FilterBuilder::default() - .topics(Some(vec![contract_event.signature()]), None, None, None) - .from_block(BlockNumber::Number(from_block.into())) - .to_block(BlockNumber::Number(to_block.into())) - .address(vec![swap_contract_address]) - .build(); - - Box::new(self.web3.eth().logs(filter).compat().map_err(|e| ERRL!("{}", e))) - } - /// Try to parse address from string. pub fn address_from_str(&self, address: &str) -> Result { Ok(try_s!(valid_addr_from_str(address))) @@ -698,8 +623,6 @@ async fn get_raw_transaction_impl(coin: EthCoin, req: RawTransactionRequest) -> async fn get_tx_hex_by_hash_impl(coin: EthCoin, tx_hash: H256) -> RawTransactionResult { let web3_tx = coin - .web3 - .eth() .transaction(TransactionId::Hash(tx_hash)) .await? .or_mm_err(|| RawTransactionError::HashNotExist(tx_hash.to_string()))?; @@ -780,7 +703,9 @@ async fn withdraw_impl(coin: EthCoin, req: WithdrawRequest) -> WithdrawResult { EthPrivKeyPolicy::Iguana(_) | EthPrivKeyPolicy::HDWallet { .. } => { // Todo: nonce_lock is still global for all addresses but this needs to be per address let _nonce_lock = coin.nonce_lock.lock().await; - let (nonce, _) = get_addr_nonce(my_address, coin.web3_instances.clone()) + let (nonce, _) = coin + .clone() + .get_addr_nonce(my_address) .compat() .timeout_secs(30.) .await? @@ -829,7 +754,7 @@ async fn withdraw_impl(coin: EthCoin, req: WithdrawRequest) -> WithdrawResult { // Please note that this method may take a long time // due to `wallet_switchEthereumChain` and `eth_sendTransaction` requests. - let tx_hash = coin.web3.eth().send_transaction(tx_to_send).await?; + let tx_hash = coin.send_transaction(tx_to_send).await?; let signed_tx = coin .wait_for_tx_appears_on_rpc(tx_hash, wait_rpc_timeout, check_every) @@ -934,7 +859,9 @@ pub async fn withdraw_erc1155(ctx: MmArc, withdraw_type: WithdrawErc1155) -> Wit ) .await?; let _nonce_lock = eth_coin.nonce_lock.lock().await; - let (nonce, _) = get_addr_nonce(eth_coin.my_address, eth_coin.web3_instances.clone()) + let (nonce, _) = eth_coin + .clone() + .get_addr_nonce(eth_coin.my_address) .compat() .timeout_secs(30.) .await? @@ -1018,7 +945,9 @@ pub async fn withdraw_erc721(ctx: MmArc, withdraw_type: WithdrawErc721) -> Withd ) .await?; let _nonce_lock = eth_coin.nonce_lock.lock().await; - let (nonce, _) = get_addr_nonce(my_address, eth_coin.web3_instances.clone()) + let (nonce, _) = eth_coin + .clone() + .get_addr_nonce(my_address) .compat() .timeout_secs(30.) .await? @@ -1190,8 +1119,6 @@ impl SwapOps for EthCoin { Some(event) => { let transaction = try_s!( selfi - .web3 - .eth() .transaction(TransactionId::Hash(event.transaction_hash.unwrap())) .await ); @@ -1748,7 +1675,7 @@ impl WatcherOps for EthCoin { let fallback_swap_contract = self.fallback_swap_contract; let fut = async move { - let tx_from_rpc = selfi.web3.eth().transaction(TransactionId::Hash(tx.hash)).await?; + let tx_from_rpc = selfi.transaction(TransactionId::Hash(tx.hash)).await?; let tx_from_rpc = tx_from_rpc.as_ref().ok_or_else(|| { ValidatePaymentError::TxDoesNotExist(format!("Didn't find provided tx {:?} on ETH node", tx)) @@ -2156,25 +2083,34 @@ impl MarketCoinOps for EthCoin { tx = &tx[2..]; } let bytes = try_fus!(hex::decode(tx)); - Box::new( - self.web3 - .eth() + + let coin = self.clone(); + + let fut = async move { + let result = coin .send_raw_transaction(bytes.into()) - .compat() + .await .map(|res| format!("{:02x}", res)) - .map_err(|e| ERRL!("{}", e)), - ) + .map_err(|e| ERRL!("{}", e)); + + result + }; + + Box::new(fut.boxed().compat()) } fn send_raw_tx_bytes(&self, tx: &[u8]) -> Box + Send> { - Box::new( - self.web3 - .eth() - .send_raw_transaction(tx.into()) - .compat() + let coin = self.clone(); + + let tx = tx.to_owned(); + let fut = async move { + coin.send_raw_transaction(tx.into()) + .await .map(|res| format!("{:02x}", res)) - .map_err(|e| ERRL!("{}", e)), - ) + .map_err(|e| ERRL!("{}", e)) + }; + + Box::new(fut.boxed().compat()) } async fn sign_raw_tx(&self, args: &SignRawTransactionRequest) -> RawTransactionResult { @@ -2344,7 +2280,7 @@ impl MarketCoinOps for EthCoin { if let Some(event) = found { if let Some(tx_hash) = event.transaction_hash { - let transaction = match selfi.web3.eth().transaction(TransactionId::Hash(tx_hash)).await { + let transaction = match selfi.transaction(TransactionId::Hash(tx_hash)).await { Ok(Some(t)) => t, Ok(None) => { info!("Tx {} not found yet", tx_hash); @@ -2375,14 +2311,16 @@ impl MarketCoinOps for EthCoin { } fn current_block(&self) -> Box + Send> { - Box::new( - self.web3 - .eth() - .block_number() - .compat() + let coin = self.clone(); + + let fut = async move { + coin.block_number() + .await .map(|res| res.as_u64()) - .map_err(|e| ERRL!("{}", e)), - ) + .map_err(|e| ERRL!("{}", e)) + }; + + Box::new(fut.boxed().compat()) } fn display_priv_key(&self) -> Result { @@ -2439,11 +2377,8 @@ async fn sign_transaction_with_keypair( } let _nonce_lock = coin.nonce_lock.lock().await; status.status(tags!(), "get_addr_nonce…"); - let (nonce, web3_instances_with_latest_nonce) = try_tx_s!( - get_addr_nonce(coin.my_address, coin.web3_instances.clone()) - .compat() - .await - ); + let (nonce, web3_instances_with_latest_nonce) = + try_tx_s!(coin.clone().get_addr_nonce(coin.my_address).compat().await); status.status(tags!(), "get_gas_price…"); let gas_price = try_tx_s!(coin.get_gas_price().compat().await); @@ -2526,7 +2461,7 @@ async fn sign_and_send_transaction_with_metamask( // Please note that this method may take a long time // due to `wallet_switchEthereumChain` and `eth_sendTransaction` requests. - let tx_hash = try_tx_s!(coin.web3.eth().send_transaction(tx_to_send).await); + let tx_hash = try_tx_s!(coin.send_transaction(tx_to_send).await); let maybe_signed_tx = try_tx_s!( coin.wait_for_tx_appears_on_rpc(tx_hash, wait_rpc_timeout, check_every) @@ -2577,7 +2512,147 @@ async fn sign_raw_eth_tx(coin: &EthCoin, args: &SignEthTransactionParams) -> Raw } } +#[async_trait] +impl RpcCommonOps for EthCoin { + type RpcClient = Web3Instance; + type Error = Web3RpcError; + + async fn get_live_client(&self) -> Result { + let mut clients = self.web3_instances.lock().await; + + // try to find first live client + for (i, client) in clients.clone().into_iter().enumerate() { + if let Web3Transport::Websocket(socket_transport) = &client.web3.transport() { + socket_transport.maybe_spawn_connection_loop(self.clone()); + }; + + if !client.web3.transport().is_last_request_failed() { + // Bring the live client to the front of rpc_clients + clients.rotate_left(i); + return Ok(client); + } + + match client + .web3 + .web3() + .client_version() + .timeout(ETH_RPC_REQUEST_TIMEOUT) + .await + { + Ok(Ok(_)) => { + // Bring the live client to the front of rpc_clients + clients.rotate_left(i); + return Ok(client); + }, + Ok(Err(rpc_error)) => { + debug!("Could not get client version on: {:?}. Error: {}", &client, rpc_error); + + if let Web3Transport::Websocket(socket_transport) = client.web3.transport() { + socket_transport.stop_connection_loop().await; + }; + }, + Err(timeout_error) => { + debug!( + "Client version timeout exceed on: {:?}. Error: {}", + &client, timeout_error + ); + + if let Web3Transport::Websocket(socket_transport) = client.web3.transport() { + socket_transport.stop_connection_loop().await; + }; + }, + }; + } + + return Err(Web3RpcError::Transport( + "All the current rpc nodes are unavailable.".to_string(), + )); + } +} + impl EthCoin { + pub(crate) async fn web3(&self) -> Result, Web3RpcError> { + self.get_live_client().await.map(|t| t.web3) + } + + /// Gets `SenderRefunded` events from etomic swap smart contract since `from_block` + fn refund_events( + &self, + swap_contract_address: Address, + from_block: u64, + to_block: u64, + ) -> Box, Error = String> + Send> { + let contract_event = try_fus!(SWAP_CONTRACT.event("SenderRefunded")); + let filter = FilterBuilder::default() + .topics(Some(vec![contract_event.signature()]), None, None, None) + .from_block(BlockNumber::Number(from_block.into())) + .to_block(BlockNumber::Number(to_block.into())) + .address(vec![swap_contract_address]) + .build(); + + let coin = self.clone(); + + let fut = async move { coin.logs(filter).await.map_err(|e| ERRL!("{}", e)) }; + + Box::new(fut.boxed().compat()) + } + + /// Gets ETH traces from ETH node between addresses in `from_block` and `to_block` + fn eth_traces( + &self, + from_addr: Vec
, + to_addr: Vec
, + from_block: BlockNumber, + to_block: BlockNumber, + limit: Option, + ) -> Box, Error = String> + Send> { + let mut filter = TraceFilterBuilder::default() + .from_address(from_addr) + .to_address(to_addr) + .from_block(from_block) + .to_block(to_block); + + if let Some(l) = limit { + filter = filter.count(l); + } + + let coin = self.clone(); + + let fut = async move { coin.trace_filter(filter.build()).await.map_err(|e| ERRL!("{}", e)) }; + Box::new(fut.boxed().compat()) + } + + /// Gets Transfer events from ERC20 smart contract `addr` between `from_block` and `to_block` + fn erc20_transfer_events( + &self, + contract: Address, + from_addr: Option
, + to_addr: Option
, + from_block: BlockNumber, + to_block: BlockNumber, + limit: Option, + ) -> Box, Error = String> + Send> { + let contract_event = try_fus!(ERC20_CONTRACT.event("Transfer")); + let topic0 = Some(vec![contract_event.signature()]); + let topic1 = from_addr.map(|addr| vec![addr.into()]); + let topic2 = to_addr.map(|addr| vec![addr.into()]); + let mut filter = FilterBuilder::default() + .topics(topic0, topic1, topic2, None) + .from_block(from_block) + .to_block(to_block) + .address(vec![contract]); + + if let Some(l) = limit { + filter = filter.limit(l); + } + + let coin = self.clone(); + + let fut = async move { coin.logs(filter.build()).await.map_err(|e| ERRL!("{}", e)) }; + + Box::new(fut.boxed().compat()) + } + /// Downloads and saves ETH transaction history of my_address, relies on Parity trace_filter API /// https://wiki.parity.io/JSONRPC-trace-module#trace_filter, this requires tracing to be enabled /// in node config. Other ETH clients (Geth, etc.) are `not` supported (yet). @@ -2603,7 +2678,7 @@ impl EthCoin { }; } - let current_block = match self.web3.eth().block_number().await { + let current_block = match self.block_number().await { Ok(block) => block, Err(e) => { ctx.log.log( @@ -2786,8 +2861,6 @@ impl EthCoin { mm_counter!(ctx.metrics, "tx.history.request.count", 1, "coin" => self.ticker.clone(), "method" => "tx_detail_by_hash"); let web3_tx = match self - .web3 - .eth() .transaction(TransactionId::Hash(trace.transaction_hash.unwrap())) .await { @@ -2819,12 +2892,7 @@ impl EthCoin { mm_counter!(ctx.metrics, "tx.history.response.count", 1, "coin" => self.ticker.clone(), "method" => "tx_detail_by_hash"); - let receipt = match self - .web3 - .eth() - .transaction_receipt(trace.transaction_hash.unwrap()) - .await - { + let receipt = match self.transaction_receipt(trace.transaction_hash.unwrap()).await { Ok(r) => r, Err(e) => { ctx.log.log( @@ -2878,8 +2946,6 @@ impl EthCoin { let raw = signed_tx_from_web3_tx(web3_tx).unwrap(); let block = match self - .web3 - .eth() .block(BlockId::Number(BlockNumber::Number(trace.block_number.into()))) .await { @@ -2962,7 +3028,7 @@ impl EthCoin { }; } - let current_block = match self.web3.eth().block_number().await { + let current_block = match self.block_number().await { Ok(block) => block, Err(e) => { ctx.log.log( @@ -3165,8 +3231,6 @@ impl EthCoin { "coin" => self.ticker.clone(), "client" => "ethereum", "method" => "tx_detail_by_hash"); let web3_tx = match self - .web3 - .eth() .transaction(TransactionId::Hash(event.transaction_hash.unwrap())) .await { @@ -3200,12 +3264,7 @@ impl EthCoin { }, }; - let receipt = match self - .web3 - .eth() - .transaction_receipt(event.transaction_hash.unwrap()) - .await - { + let receipt = match self.transaction_receipt(event.transaction_hash.unwrap()).await { Ok(r) => r, Err(e) => { ctx.log.log( @@ -3236,12 +3295,7 @@ impl EthCoin { None => None, }; let block_number = event.block_number.unwrap(); - let block = match self - .web3 - .eth() - .block(BlockId::Number(BlockNumber::Number(block_number))) - .await - { + let block = match self.block(BlockId::Number(BlockNumber::Number(block_number))).await { Ok(Some(b)) => b, Ok(None) => { ctx.log.log( @@ -3983,7 +4037,7 @@ impl EthCoin { let coin = self.clone(); let fut = async move { match coin.coin_type { - EthCoinType::Eth => Ok(coin.web3.eth().balance(address, Some(BlockNumber::Latest)).await?), + EthCoinType::Eth => Ok(coin.balance(address, Some(BlockNumber::Latest)).await?), EthCoinType::Erc20 { ref token_addr, .. } => { let function = ERC20_CONTRACT.function("balanceOf")?; let data = function.encode_input(&[Token::Address(address)])?; @@ -4090,9 +4144,13 @@ impl EthCoin { Ok(owner_address) } - fn estimate_gas(&self, req: CallRequest) -> Box + Send> { + fn estimate_gas_wrapper(&self, req: CallRequest) -> Box + Send> { + let coin = self.clone(); + // always using None block number as old Geth version accept only single argument in this RPC - Box::new(self.web3.eth().estimate_gas(req, None).compat()) + let fut = async move { coin.estimate_gas(req, None).await }; + + Box::new(fut.boxed().compat()) } /// Estimates how much gas is necessary to allow the contract call to complete. @@ -4120,18 +4178,16 @@ impl EthCoin { gas_price: Some(gas_price), ..CallRequest::default() }; - coin.estimate_gas(estimate_gas_req).map_to_mm_fut(Web3RpcError::from) + coin.estimate_gas_wrapper(estimate_gas_req) + .map_to_mm_fut(Web3RpcError::from) })) } fn eth_balance(&self) -> BalanceFut { - Box::new( - self.web3 - .eth() - .balance(self.my_address, Some(BlockNumber::Latest)) - .compat() - .map_to_mm_fut(BalanceError::from), - ) + let coin = self.clone(); + + let fut = async move { coin.balance(coin.my_address, Some(BlockNumber::Latest)).await }; + Box::new(fut.boxed().compat().map_to_mm_fut(BalanceError::from)) } async fn call_request(&self, to: Address, value: Option, data: Option) -> Result { @@ -4145,10 +4201,7 @@ impl EthCoin { ..CallRequest::default() }; - self.web3 - .eth() - .call(request, Some(BlockId::Number(BlockNumber::Latest))) - .await + self.call(request, Some(BlockId::Number(BlockNumber::Latest))).await } fn allowance(&self, spender: Address) -> Web3RpcFut { @@ -4250,7 +4303,10 @@ impl EthCoin { .address(vec![swap_contract_address]) .build(); - Box::new(self.web3.eth().logs(filter).compat().map_err(|e| ERRL!("{}", e))) + let coin = self.clone(); + + let fut = async move { coin.logs(filter).await.map_err(|e| ERRL!("{}", e)) }; + Box::new(fut.boxed().compat()) } /// Gets `ReceiverSpent` events from etomic swap smart contract since `from_block` @@ -4268,7 +4324,10 @@ impl EthCoin { .address(vec![swap_contract_address]) .build(); - Box::new(self.web3.eth().logs(filter).compat().map_err(|e| ERRL!("{}", e))) + let coin = self.clone(); + + let fut = async move { coin.logs(filter).await.map_err(|e| ERRL!("{}", e)) }; + Box::new(fut.boxed().compat()) } fn validate_payment(&self, input: ValidatePaymentInput) -> ValidatePaymentFut<()> { @@ -4309,7 +4368,7 @@ impl EthCoin { ))); } - let tx_from_rpc = selfi.web3.eth().transaction(TransactionId::Hash(tx.hash)).await?; + let tx_from_rpc = selfi.transaction(TransactionId::Hash(tx.hash)).await?; let tx_from_rpc = tx_from_rpc.as_ref().ok_or_else(|| { ValidatePaymentError::TxDoesNotExist(format!("Didn't find provided tx {:?} on ETH node", tx.hash)) })?; @@ -4623,8 +4682,7 @@ impl EthCoin { if let Some(event) = found { match event.transaction_hash { Some(tx_hash) => { - let transaction = match try_s!(self.web3.eth().transaction(TransactionId::Hash(tx_hash)).await) - { + let transaction = match try_s!(self.transaction(TransactionId::Hash(tx_hash)).await) { Some(t) => t, None => { return ERR!("Found ReceiverSpent event, but transaction {:02x} is missing", tx_hash) @@ -4649,8 +4707,7 @@ impl EthCoin { if let Some(event) = found { match event.transaction_hash { Some(tx_hash) => { - let transaction = match try_s!(self.web3.eth().transaction(TransactionId::Hash(tx_hash)).await) - { + let transaction = match try_s!(self.transaction(TransactionId::Hash(tx_hash)).await) { Some(t) => t, None => { return ERR!("Found SenderRefunded event, but transaction {:02x} is missing", tx_hash) @@ -4708,7 +4765,7 @@ impl EthCoin { None => None, }; - let eth_gas_price = match coin.web3.eth().gas_price().await { + let eth_gas_price = match coin.gas_price().await { Ok(eth_gas) => Some(eth_gas), Err(e) => { error!("Error {} on eth_gasPrice request", e); @@ -4716,11 +4773,7 @@ impl EthCoin { }, }; - let fee_history_namespace: EthFeeHistoryNamespace<_> = coin.web3.api(); - let eth_fee_history_price = match fee_history_namespace - .eth_fee_history(U256::from(1u64), BlockNumber::Latest, &[]) - .await - { + let eth_fee_history_price = match coin.eth_fee_history(U256::from(1u64), BlockNumber::Latest, &[]).await { Ok(res) => res .base_fee_per_gas .first() @@ -4753,7 +4806,7 @@ impl EthCoin { /// The function is endless, we just keep looping in case of a transport error hoping it will go away. async fn wait_for_addr_nonce_increase(&self, addr: Address, prev_nonce: U256) { repeatable!(async { - match get_addr_nonce(addr, self.web3_instances.clone()).compat().await { + match self.clone().get_addr_nonce(addr).compat().await { Ok((new_nonce, _)) if new_nonce > prev_nonce => Ready(()), Ok((_nonce, _)) => Retry(()), Err(e) => { @@ -4778,7 +4831,7 @@ impl EthCoin { ) -> Web3RpcResult> { let wait_until = wait_until_ms(wait_rpc_timeout_ms); while now_ms() < wait_until { - let maybe_tx = self.web3.eth().transaction(TransactionId::Hash(tx_hash)).await?; + let maybe_tx = self.transaction(TransactionId::Hash(tx_hash)).await?; if let Some(tx) = maybe_tx { let signed_tx = signed_tx_from_web3_tx(tx).map_to_mm(Web3RpcError::InvalidResponse)?; return Ok(Some(signed_tx)); @@ -4807,7 +4860,7 @@ impl EthCoin { ))); } - let web3_receipt = match selfi.web3.eth().transaction_receipt(payment_hash).await { + let web3_receipt = match selfi.transaction_receipt(payment_hash).await { Ok(r) => r, Err(e) => { error!( @@ -4855,7 +4908,7 @@ impl EthCoin { ))); } - match selfi.web3.eth().block_number().await { + match selfi.block_number().await { Ok(current_block) => { if current_block >= block_number { break Ok(()); @@ -4885,6 +4938,75 @@ impl EthCoin { Ok(()) } + + /// Requests the nonce from all available nodes and returns the highest nonce available with the list of nodes that returned the highest nonce. + /// Transactions will be sent using the nodes that returned the highest nonce. + fn get_addr_nonce(self, addr: Address) -> Box), Error = String> + Send> { + const TMP_SOCKET_DURATION: Duration = Duration::from_secs(300); + + let fut = async move { + let mut errors: u32 = 0; + let web3_instances = self.web3_instances.lock().await.to_vec(); + loop { + let (futures, web3_instances): (Vec<_>, Vec<_>) = web3_instances + .iter() + .map(|instance| { + if let Web3Transport::Websocket(socket_transport) = instance.web3.transport() { + socket_transport.maybe_spawn_temporary_connection_loop( + self.clone(), + Instant::now() + TMP_SOCKET_DURATION, + ); + }; + + if instance.is_parity { + let parity: ParityNonce<_> = instance.web3.api(); + (Either::Left(parity.parity_next_nonce(addr)), instance.clone()) + } else { + ( + Either::Right(instance.web3.eth().transaction_count(addr, Some(BlockNumber::Pending))), + instance.clone(), + ) + } + }) + .unzip(); + + let nonces: Vec<_> = join_all(futures) + .await + .into_iter() + .zip(web3_instances.into_iter()) + .filter_map(|(nonce_res, instance)| match nonce_res { + Ok(n) => Some((n, instance)), + Err(e) => { + error!("Error getting nonce for addr {:?}: {}", addr, e); + None + }, + }) + .collect(); + if nonces.is_empty() { + // all requests errored + errors += 1; + if errors > 5 { + return ERR!("Couldn't get nonce after 5 errored attempts, aborting"); + } + } else { + let max = nonces + .iter() + .map(|(n, _)| *n) + .max() + .expect("nonces should not be empty!"); + break Ok(( + max, + nonces + .into_iter() + .filter_map(|(n, instance)| if n == max { Some(instance) } else { None }) + .collect(), + )); + } + Timer::sleep(1.).await + } + }; + Box::new(Box::pin(fut).compat()) + } } #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] @@ -5118,7 +5240,7 @@ impl MmCoin for EthCoin { // Please note if the wallet's balance is insufficient to withdraw, then `estimate_gas` may fail with the `Exception` error. // Ideally we should determine the case when we have the insufficient balance and return `TradePreimageError::NotSufficientBalance` error. - let gas_limit = self.estimate_gas(estimate_gas_req).compat().await?; + let gas_limit = self.estimate_gas_wrapper(estimate_gas_req).compat().await?; let total_fee = gas_limit * gas_price; let amount = u256_to_big_decimal(total_fee, ETH_DECIMALS)?; Ok(TradeFee { @@ -5256,7 +5378,7 @@ fn validate_fee_impl(coin: EthCoin, validate_fee_args: EthValidateFeeArgs<'_>) - let fut = async move { let expected_value = wei_from_big_decimal(&amount, coin.decimals)?; - let tx_from_rpc = coin.web3.eth().transaction(TransactionId::Hash(fee_tx_hash)).await?; + let tx_from_rpc = coin.transaction(TransactionId::Hash(fee_tx_hash)).await?; let tx_from_rpc = tx_from_rpc.as_ref().ok_or_else(|| { ValidatePaymentError::TxDoesNotExist(format!("Didn't find provided tx {:?} on ETH node", fee_tx_hash)) @@ -5572,15 +5694,6 @@ pub async fn eth_coin_from_conf_and_request( let mut rng = small_rng(); urls.as_mut_slice().shuffle(&mut rng); - let mut nodes = vec![]; - for url in urls.iter() { - nodes.push(HttpTransportNode { - uri: try_s!(url.parse()), - gui_auth: false, - }); - } - drop_mutability!(nodes); - let swap_contract_address: Address = try_s!(json::from_value(req["swap_contract_address"].clone())); if swap_contract_address == Address::default() { return ERR!("swap_contract_address can't be zero address"); @@ -5602,16 +5715,53 @@ pub async fn eth_coin_from_conf_and_request( let mut web3_instances = vec![]; let event_handlers = rpc_event_handlers_for_eth_transport(ctx, ticker.to_string()); - for node in nodes.iter() { - let transport = Web3Transport::new_http(vec![node.clone()], event_handlers.clone()); + for url in urls.iter() { + let uri: Uri = try_s!(url.parse()); + + let transport = match uri.scheme_str() { + Some("ws") | Some("wss") => { + const TMP_SOCKET_CONNECTION: Duration = Duration::from_secs(20); + + let node = WebsocketTransportNode { + uri: uri.clone(), + gui_auth: false, + }; + let websocket_transport = WebsocketTransport::with_event_handlers(node, event_handlers.clone()); + + // Temporarily start the connection loop (we close the connection once we have the client version below). + // Ideally, it would be much better to not do this workaround, which requires a lot of refactoring or + // dropping websocket support on parity nodes. + let fut = websocket_transport + .clone() + .start_connection_loop(Some(Instant::now() + TMP_SOCKET_CONNECTION)); + let settings = AbortSettings::info_on_abort(format!("connection loop stopped for {:?}", uri)); + ctx.spawner().spawn_with_settings(fut, settings); + + Web3Transport::Websocket(websocket_transport) + }, + Some("http") | Some("https") => { + let node = HttpTransportNode { uri, gui_auth: false }; + + Web3Transport::new_http_with_event_handlers(node, event_handlers.clone()) + }, + _ => { + return ERR!( + "Invalid node address '{}'. Only http(s) and ws(s) nodes are supported", + uri + ); + }, + }; + let web3 = Web3::new(transport); let version = match web3.web3().client_version().await { Ok(v) => v, Err(e) => { - error!("Couldn't get client version for url {}: {}", node.uri, e); + error!("Couldn't get client version for url {}: {}", url, e); + continue; }, }; + web3_instances.push(Web3Instance { web3, is_parity: version.contains("Parity") || version.contains("parity"), @@ -5622,9 +5772,6 @@ pub async fn eth_coin_from_conf_and_request( return ERR!("Failed to get client version for all urls"); } - let transport = Web3Transport::new_http(nodes, event_handlers); - let web3 = Web3::new(transport); - let (coin_type, decimals) = match protocol { CoinProtocol::ETH => (EthCoinType::Eth, ETH_DECIMALS), CoinProtocol::ERC20 { @@ -5633,7 +5780,16 @@ pub async fn eth_coin_from_conf_and_request( } => { let token_addr = try_s!(valid_addr_from_str(&contract_address)); let decimals = match conf["decimals"].as_u64() { - None | Some(0) => try_s!(get_token_decimals(&web3, token_addr).await), + None | Some(0) => try_s!( + get_token_decimals( + &web3_instances + .first() + .expect("web3_instances can't be empty in ETH activation") + .web3, + token_addr + ) + .await + ), Some(d) => d as u8, }; (EthCoinType::Erc20 { platform, token_addr }, decimals) @@ -5694,8 +5850,7 @@ pub async fn eth_coin_from_conf_and_request( gas_station_url: try_s!(json::from_value(req["gas_station_url"].clone())), gas_station_decimals: gas_station_decimals.unwrap_or(ETH_GAS_STATION_DECIMALS), gas_station_policy, - web3, - web3_instances, + web3_instances: AsyncMutex::new(web3_instances), history_sync_state: Mutex::new(initial_history_state), ctx: ctx.weak(), required_confirmations, @@ -5751,69 +5906,6 @@ pub(crate) fn eth_addr_to_hex(address: &Address) -> String { format!("{:#02x}", /// The input must be 0x prefixed hex string fn is_valid_checksum_addr(addr: &str) -> bool { addr == checksum_address(addr) } -/// Requests the nonce from all available nodes and returns the highest nonce available with the list of nodes that returned the highest nonce. -/// Transactions will be sent using the nodes that returned the highest nonce. -#[cfg_attr(test, mockable)] -fn get_addr_nonce( - addr: Address, - web3s: Vec, -) -> Box), Error = String> + Send> { - let fut = async move { - let mut errors: u32 = 0; - loop { - let (futures, web3s): (Vec<_>, Vec<_>) = web3s - .iter() - .map(|web3| { - if web3.is_parity { - let parity: ParityNonce<_> = web3.web3.api(); - (Either::Left(parity.parity_next_nonce(addr)), web3.clone()) - } else { - ( - Either::Right(web3.web3.eth().transaction_count(addr, Some(BlockNumber::Pending))), - web3.clone(), - ) - } - }) - .unzip(); - - let nonces: Vec<_> = join_all(futures) - .await - .into_iter() - .zip(web3s.into_iter()) - .filter_map(|(nonce_res, web3)| match nonce_res { - Ok(n) => Some((n, web3)), - Err(e) => { - error!("Error getting nonce for addr {:?}: {}", addr, e); - None - }, - }) - .collect(); - if nonces.is_empty() { - // all requests errored - errors += 1; - if errors > 5 { - return ERR!("Couldn't get nonce after 5 errored attempts, aborting"); - } - } else { - let max = nonces - .iter() - .map(|(n, _)| *n) - .max() - .expect("nonces should not be empty!"); - break Ok(( - max, - nonces - .into_iter() - .filter_map(|(n, web3)| if n == max { Some(web3) } else { None }) - .collect(), - )); - } - Timer::sleep(1.).await - } - }; - Box::new(Box::pin(fut).compat()) -} - fn increase_by_percent_one_gwei(num: U256, percent: u64) -> U256 { let one_gwei = U256::from(10u64.pow(9)); let percent = (num / U256::from(100)) * U256::from(percent); @@ -5976,7 +6068,7 @@ async fn get_eth_gas_details( }; // TODO Note if the wallet's balance is insufficient to withdraw, then `estimate_gas` may fail with the `Exception` error. // TODO Ideally we should determine the case when we have the insufficient balance and return `WithdrawError::NotSufficientBalance`. - let gas_limit = eth_coin.estimate_gas(estimate_gas_req).compat().await?; + let gas_limit = eth_coin.estimate_gas_wrapper(estimate_gas_req).compat().await?; Ok((gas_limit, gas_price)) }, } diff --git a/mm2src/coins/eth/eth_balance_events.rs b/mm2src/coins/eth/eth_balance_events.rs index 7f31f9db87..b6c1441d73 100644 --- a/mm2src/coins/eth/eth_balance_events.rs +++ b/mm2src/coins/eth/eth_balance_events.rs @@ -79,9 +79,7 @@ impl EventBehaviour for EthCoin { async fn handle(self, interval: f64, tx: oneshot::Sender) { const RECEIVER_DROPPED_MSG: &str = "Receiver is dropped, which should never happen."; - async fn with_socket(_coin: EthCoin, _ctx: MmArc) { todo!() } - - async fn with_polling(coin: EthCoin, ctx: MmArc, interval: f64) { + async fn start_polling(coin: EthCoin, ctx: MmArc, interval: f64) { let mut cache: HashMap = HashMap::new(); loop { @@ -147,7 +145,7 @@ impl EventBehaviour for EthCoin { tx.send(EventInitStatus::Success).expect(RECEIVER_DROPPED_MSG); - with_polling(self, ctx, interval).await + start_polling(self, ctx, interval).await } async fn spawn_if_active(self, config: &EventStreamConfiguration) -> EventInitStatus { diff --git a/mm2src/coins/eth/eth_rpc.rs b/mm2src/coins/eth/eth_rpc.rs new file mode 100644 index 0000000000..36b2c3221c --- /dev/null +++ b/mm2src/coins/eth/eth_rpc.rs @@ -0,0 +1,455 @@ +//! This module serves as an abstraction layer for Ethereum RPCs. +//! Unlike the built-in functions in web3, this module dynamically +//! rotates through all transports in case of failures. + +use super::web3_transport::FeeHistoryResult; +use super::{web3_transport::Web3Transport, EthCoin}; +use common::{custom_futures::timeout::FutureTimerExt, log::debug}; +use instant::Duration; +use serde_json::Value; +use web3::types::{Address, Block, BlockId, BlockNumber, Bytes, CallRequest, FeeHistory, Filter, Log, Proof, SyncState, + Trace, TraceFilter, Transaction, TransactionId, TransactionReceipt, TransactionRequest, Work, H256, + H520, H64, U256, U64}; +use web3::{helpers, Transport}; + +pub(crate) const ETH_RPC_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + +impl EthCoin { + async fn try_rpc_send(&self, method: &str, params: Vec) -> Result { + let mut clients = self.web3_instances.lock().await; + + for (i, client) in clients.clone().into_iter().enumerate() { + let execute_fut = match client.web3.transport() { + Web3Transport::Http(http) => http.execute(method, params.clone()), + Web3Transport::Websocket(socket) => { + socket.maybe_spawn_connection_loop(self.clone()); + socket.execute(method, params.clone()) + }, + #[cfg(target_arch = "wasm32")] + Web3Transport::Metamask(metamask) => metamask.execute(method, params.clone()), + }; + + match execute_fut.timeout(ETH_RPC_REQUEST_TIMEOUT).await { + Ok(Ok(r)) => { + // Bring the live client to the front of rpc_clients + clients.rotate_left(i); + return Ok(r); + }, + Ok(Err(rpc_error)) => { + debug!("Request on '{method}' failed. Error: {rpc_error}"); + + if let Web3Transport::Websocket(socket_transport) = client.web3.transport() { + socket_transport.stop_connection_loop().await; + }; + }, + Err(timeout_error) => { + debug!("Timeout exceed for '{method}' request. Error: {timeout_error}",); + + if let Web3Transport::Websocket(socket_transport) = client.web3.transport() { + socket_transport.stop_connection_loop().await; + }; + }, + }; + } + + Err(web3::Error::Transport(web3::error::TransportError::Message(format!( + "Request '{method}' failed due to not being able to find a living RPC client" + )))) + } +} + +#[allow(dead_code)] +impl EthCoin { + /// Get list of available accounts. + pub(crate) async fn accounts(&self) -> Result, web3::Error> { + self.try_rpc_send("eth_accounts", vec![]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Get current block number + pub(crate) async fn block_number(&self) -> Result { + self.try_rpc_send("eth_blockNumber", vec![]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Call a constant method of contract without changing the state of the blockchain. + pub(crate) async fn call(&self, req: CallRequest, block: Option) -> Result { + let req = helpers::serialize(&req); + let block = helpers::serialize(&block.unwrap_or_else(|| BlockNumber::Latest.into())); + + self.try_rpc_send("eth_call", vec![req, block]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Get coinbase address + pub(crate) async fn coinbase(&self) -> Result { + self.try_rpc_send("eth_coinbase", vec![]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Compile LLL + pub(crate) async fn compile_lll(&self, code: String) -> Result { + let code = helpers::serialize(&code); + self.try_rpc_send("eth_compileLLL", vec![code]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Compile Solidity + pub(crate) async fn compile_solidity(&self, code: String) -> Result { + let code = helpers::serialize(&code); + self.try_rpc_send("eth_compileSolidity", vec![code]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Compile Serpent + pub(crate) async fn compile_serpent(&self, code: String) -> Result { + let code = helpers::serialize(&code); + self.try_rpc_send("eth_compileSerpent", vec![code]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Call a contract without changing the state of the blockchain to estimate gas usage. + pub(crate) async fn estimate_gas(&self, req: CallRequest, block: Option) -> Result { + let req = helpers::serialize(&req); + + let args = match block { + Some(block) => vec![req, helpers::serialize(&block)], + None => vec![req], + }; + + self.try_rpc_send("eth_estimateGas", args) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Get current recommended gas price + pub(crate) async fn gas_price(&self) -> Result { + self.try_rpc_send("eth_gasPrice", vec![]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Returns a collection of historical gas information. This can be used for evaluating the max_fee_per_gas + /// and max_priority_fee_per_gas to send the future transactions. + pub(crate) async fn fee_history( + &self, + block_count: U256, + newest_block: BlockNumber, + reward_percentiles: Option>, + ) -> Result { + let block_count = helpers::serialize(&block_count); + let newest_block = helpers::serialize(&newest_block); + let reward_percentiles = helpers::serialize(&reward_percentiles); + + self.try_rpc_send("eth_feeHistory", vec![block_count, newest_block, reward_percentiles]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Get balance of given address + pub(crate) async fn balance(&self, address: Address, block: Option) -> Result { + let address = helpers::serialize(&address); + let block = helpers::serialize(&block.unwrap_or(BlockNumber::Latest)); + + self.try_rpc_send("eth_getBalance", vec![address, block]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Get all logs matching a given filter object + pub(crate) async fn logs(&self, filter: Filter) -> Result, web3::Error> { + let filter = helpers::serialize(&filter); + self.try_rpc_send("eth_getLogs", vec![filter]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Get block details with transaction hashes. + pub(crate) async fn block(&self, block: BlockId) -> Result>, web3::Error> { + let include_txs = helpers::serialize(&false); + + let result = match block { + BlockId::Hash(hash) => { + let hash = helpers::serialize(&hash); + self.try_rpc_send("eth_getBlockByHash", vec![hash, include_txs]) + }, + BlockId::Number(num) => { + let num = helpers::serialize(&num); + self.try_rpc_send("eth_getBlockByNumber", vec![num, include_txs]) + }, + }; + + result.await.and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Get block details with full transaction objects. + pub(crate) async fn block_with_txs(&self, block: BlockId) -> Result>, web3::Error> { + let include_txs = helpers::serialize(&true); + + let result = match block { + BlockId::Hash(hash) => { + let hash = helpers::serialize(&hash); + self.try_rpc_send("eth_getBlockByHash", vec![hash, include_txs]) + }, + BlockId::Number(num) => { + let num = helpers::serialize(&num); + self.try_rpc_send("eth_getBlockByNumber", vec![num, include_txs]) + }, + }; + + result.await.and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Get number of transactions in block + pub(crate) async fn block_transaction_count(&self, block: BlockId) -> Result, web3::Error> { + let result = match block { + BlockId::Hash(hash) => { + let hash = helpers::serialize(&hash); + self.try_rpc_send("eth_getBlockTransactionCountByHash", vec![hash]) + }, + BlockId::Number(num) => { + let num = helpers::serialize(&num); + + self.try_rpc_send("eth_getBlockTransactionCountByNumber", vec![num]) + }, + }; + + result.await.and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Get code under given address + pub(crate) async fn code(&self, address: Address, block: Option) -> Result { + let address = helpers::serialize(&address); + let block = helpers::serialize(&block.unwrap_or(BlockNumber::Latest)); + + self.try_rpc_send("eth_getCode", vec![address, block]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Get supported compilers + pub(crate) async fn compilers(&self) -> Result, web3::Error> { + self.try_rpc_send("eth_getCompilers", vec![]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Get chain id + pub(crate) async fn chain_id(&self) -> Result { + self.try_rpc_send("eth_chainId", vec![]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Get available user accounts. This method is only available in the browser. With MetaMask, + /// this will cause the popup that prompts the user to allow or deny access to their accounts + /// to your app. + pub(crate) async fn request_accounts(&self) -> Result, web3::Error> { + self.try_rpc_send("eth_requestAccounts", vec![]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Get storage entry + pub(crate) async fn storage( + &self, + address: Address, + idx: U256, + block: Option, + ) -> Result { + let address = helpers::serialize(&address); + let idx = helpers::serialize(&idx); + let block = helpers::serialize(&block.unwrap_or(BlockNumber::Latest)); + + self.try_rpc_send("eth_getStorageAt", vec![address, idx, block]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Get nonce + pub(crate) async fn transaction_count( + &self, + address: Address, + block: Option, + ) -> Result { + let address = helpers::serialize(&address); + let block = helpers::serialize(&block.unwrap_or(BlockNumber::Latest)); + + self.try_rpc_send("eth_getTransactionCount", vec![address, block]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Get transaction + pub(crate) async fn transaction(&self, id: TransactionId) -> Result, web3::Error> { + let result = match id { + TransactionId::Hash(hash) => { + let hash = helpers::serialize(&hash); + self.try_rpc_send("eth_getTransactionByHash", vec![hash]) + }, + TransactionId::Block(BlockId::Hash(hash), index) => { + let hash = helpers::serialize(&hash); + let idx = helpers::serialize(&index); + self.try_rpc_send("eth_getTransactionByBlockHashAndIndex", vec![hash, idx]) + }, + TransactionId::Block(BlockId::Number(number), index) => { + let number = helpers::serialize(&number); + let idx = helpers::serialize(&index); + self.try_rpc_send("eth_getTransactionByBlockNumberAndIndex", vec![number, idx]) + }, + }; + + result.await.and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Get transaction receipt + pub(crate) async fn transaction_receipt(&self, hash: H256) -> Result, web3::Error> { + let hash = helpers::serialize(&hash); + + self.try_rpc_send("eth_getTransactionReceipt", vec![hash]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Get work package + pub(crate) async fn work(&self) -> Result { + self.try_rpc_send("eth_getWork", vec![]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Get hash rate + pub(crate) async fn hashrate(&self) -> Result { + self.try_rpc_send("eth_hashrate", vec![]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Get mining status + pub(crate) async fn mining(&self) -> Result { + self.try_rpc_send("eth_mining", vec![]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Start new block filter + pub(crate) async fn new_block_filter(&self) -> Result { + self.try_rpc_send("eth_newBlockFilter", vec![]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Start new pending transaction filter + pub(crate) async fn new_pending_transaction_filter(&self) -> Result { + self.try_rpc_send("eth_newPendingTransactionFilter", vec![]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Start new pending transaction filter + pub(crate) async fn protocol_version(&self) -> Result { + self.try_rpc_send("eth_protocolVersion", vec![]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Sends a rlp-encoded signed transaction + pub(crate) async fn send_raw_transaction(&self, rlp: Bytes) -> Result { + let rlp = helpers::serialize(&rlp); + self.try_rpc_send("eth_sendRawTransaction", vec![rlp]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Sends a transaction transaction + pub(crate) async fn send_transaction(&self, tx: TransactionRequest) -> Result { + let tx = helpers::serialize(&tx); + self.try_rpc_send("eth_sendTransaction", vec![tx]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Signs a hash of given data + pub(crate) async fn sign(&self, address: Address, data: Bytes) -> Result { + let address = helpers::serialize(&address); + let data = helpers::serialize(&data); + self.try_rpc_send("eth_sign", vec![address, data]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Submit hashrate of external miner + pub(crate) async fn submit_hashrate(&self, rate: U256, id: H256) -> Result { + let rate = helpers::serialize(&rate); + let id = helpers::serialize(&id); + self.try_rpc_send("eth_submitHashrate", vec![rate, id]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Submit work of external miner + pub(crate) async fn submit_work(&self, nonce: H64, pow_hash: H256, mix_hash: H256) -> Result { + let nonce = helpers::serialize(&nonce); + let pow_hash = helpers::serialize(&pow_hash); + let mix_hash = helpers::serialize(&mix_hash); + self.try_rpc_send("eth_submitWork", vec![nonce, pow_hash, mix_hash]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Get syncing status + pub(crate) async fn syncing(&self) -> Result { + self.try_rpc_send("eth_syncing", vec![]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Returns the account- and storage-values of the specified account including the Merkle-proof. + pub(crate) async fn proof( + &self, + address: Address, + keys: Vec, + block: Option, + ) -> Result, web3::Error> { + let add = helpers::serialize(&address); + let ks = helpers::serialize(&keys); + let blk = helpers::serialize(&block.unwrap_or(BlockNumber::Latest)); + self.try_rpc_send("eth_getProof", vec![add, ks, blk]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + pub(crate) async fn eth_fee_history( + &self, + count: U256, + block: BlockNumber, + reward_percentiles: &[f64], + ) -> Result { + let count = helpers::serialize(&count); + let block = helpers::serialize(&block); + let reward_percentiles = helpers::serialize(&reward_percentiles); + let params = vec![count, block, reward_percentiles]; + + self.try_rpc_send("eth_feeHistory", params) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } + + /// Return traces matching the given filter + /// + /// See [TraceFilterBuilder](../types/struct.TraceFilterBuilder.html) + pub(crate) async fn trace_filter(&self, filter: TraceFilter) -> Result, web3::Error> { + let filter = helpers::serialize(&filter); + + self.try_rpc_send("trace_filter", vec![filter]) + .await + .and_then(|t| serde_json::from_value(t).map_err(Into::into)) + } +} diff --git a/mm2src/coins/eth/eth_tests.rs b/mm2src/coins/eth/eth_tests.rs index 278ce1c124..6b05d5c6f2 100644 --- a/mm2src/coins/eth/eth_tests.rs +++ b/mm2src/coins/eth/eth_tests.rs @@ -104,17 +104,21 @@ fn eth_coin_from_keypair( fallback_swap_contract: Option
, key_pair: KeyPair, ) -> (MmArc, EthCoin) { - let mut nodes = vec![]; + let mut web3_instances = vec![]; for url in urls.iter() { - nodes.push(HttpTransportNode { + let node = HttpTransportNode { uri: url.parse().unwrap(), gui_auth: false, - }); + }; + + let transport = Web3Transport::new_http(node); + let web3 = Web3::new(transport); + + web3_instances.push(Web3Instance { web3, is_parity: false }); } - drop_mutability!(nodes); - let transport = Web3Transport::with_nodes(nodes); - let web3 = Web3::new(transport); + drop_mutability!(web3_instances); + let conf = json!({ "coins":[ eth_testnet_conf(), @@ -141,11 +145,7 @@ fn eth_coin_from_keypair( fallback_swap_contract, contract_supports_watchers: false, ticker, - web3_instances: vec![Web3Instance { - web3: web3.clone(), - is_parity: false, - }], - web3, + web3_instances: AsyncMutex::new(web3_instances), ctx: ctx.weak(), required_confirmations: 1.into(), chain_id: None, @@ -300,10 +300,27 @@ fn test_nonce_several_urls() { ) .unwrap(); - let devnet_transport = Web3Transport::single_node(ETH_DEV_NODE, false); - let sepolia_transport = Web3Transport::single_node("https://rpc2.sepolia.org", false); + let node = HttpTransportNode { + uri: ETH_DEV_NODE.parse().unwrap(), + gui_auth: false, + }; + + let devnet_transport = Web3Transport::new_http(node); + + let node = HttpTransportNode { + uri: "https://rpc2.sepolia.org".parse().unwrap(), + gui_auth: false, + }; + + let sepolia_transport = Web3Transport::new_http(node); + + let node = HttpTransportNode { + uri: "http://195.201.0.6:8989".parse().unwrap(), + gui_auth: false, + }; + // get nonce must succeed if some nodes are down at the moment for some reason - let failing_transport = Web3Transport::single_node("http://195.201.0.6:8989", false); + let failing_transport = Web3Transport::new_http(node); let web3_devnet = Web3::new(devnet_transport); let web3_sepolia = Web3::new(sepolia_transport); @@ -319,9 +336,9 @@ fn test_nonce_several_urls() { swap_contract_address: Address::from_str(ETH_DEV_SWAP_CONTRACT).unwrap(), fallback_swap_contract: None, contract_supports_watchers: false, - web3_instances: vec![ + web3_instances: AsyncMutex::new(vec![ Web3Instance { - web3: web3_devnet.clone(), + web3: web3_devnet, is_parity: false, }, Web3Instance { @@ -332,8 +349,7 @@ fn test_nonce_several_urls() { web3: web3_failing, is_parity: false, }, - ], - web3: web3_devnet, + ]), decimals: 18, gas_station_url: Some("https://ethgasstation.info/json/ethgasAPI.json".into()), gas_station_decimals: ETH_GAS_STATION_DECIMALS, @@ -353,9 +369,7 @@ fn test_nonce_several_urls() { let payment = coin.send_to_address(coin.my_address, 200000000.into()).wait().unwrap(); log!("{:?}", payment); - let new_nonce = get_addr_nonce(coin.my_address, coin.web3_instances.clone()) - .wait() - .unwrap(); + let new_nonce = coin.clone().get_addr_nonce(coin.my_address).wait().unwrap(); log!("{:?}", new_nonce); } @@ -368,7 +382,12 @@ fn test_wait_for_payment_spend_timeout() { &hex::decode("809465b17d0a4ddb3e4c69e8f23c2cabad868f51f8bed5c765ad1d6516c3306f").unwrap(), ) .unwrap(); - let transport = Web3Transport::single_node(ETH_DEV_NODE, false); + let node = HttpTransportNode { + uri: ETH_DEV_NODE.parse().unwrap(), + gui_auth: false, + }; + + let transport = Web3Transport::new_http(node); let web3 = Web3::new(transport); let ctx = MmCtxBuilder::new().into_mm_arc(); @@ -386,11 +405,7 @@ fn test_wait_for_payment_spend_timeout() { fallback_swap_contract: None, contract_supports_watchers: false, ticker: "ETH".into(), - web3_instances: vec![Web3Instance { - web3: web3.clone(), - is_parity: false, - }], - web3, + web3_instances: AsyncMutex::new(vec![Web3Instance { web3, is_parity: false }]), ctx: ctx.weak(), required_confirmations: 1.into(), chain_id: None, @@ -472,7 +487,7 @@ fn test_withdraw_impl_manual_fee() { let balance = wei_from_big_decimal(&1000000000.into(), 18).unwrap(); MockResult::Return(Box::new(futures01::future::ok(balance))) }); - get_addr_nonce.mock_safe(|_, _| MockResult::Return(Box::new(futures01::future::ok((0.into(), vec![]))))); + EthCoin::get_addr_nonce.mock_safe(|_, _| MockResult::Return(Box::new(futures01::future::ok((0.into(), vec![]))))); let withdraw_req = WithdrawRequest { amount: 1.into(), @@ -516,7 +531,7 @@ fn test_withdraw_impl_fee_details() { let balance = wei_from_big_decimal(&1000000000.into(), 18).unwrap(); MockResult::Return(Box::new(futures01::future::ok(balance))) }); - get_addr_nonce.mock_safe(|_, _| MockResult::Return(Box::new(futures01::future::ok((0.into(), vec![]))))); + EthCoin::get_addr_nonce.mock_safe(|_, _| MockResult::Return(Box::new(futures01::future::ok((0.into(), vec![]))))); let withdraw_req = WithdrawRequest { amount: 1.into(), @@ -647,7 +662,7 @@ fn get_erc20_sender_trade_preimage() { .mock_safe(|_, _| MockResult::Return(Box::new(futures01::future::ok(unsafe { ALLOWANCE.into() })))); EthCoin::get_gas_price.mock_safe(|_| MockResult::Return(Box::new(futures01::future::ok(GAS_PRICE.into())))); - EthCoin::estimate_gas.mock_safe(|_, _| { + EthCoin::estimate_gas_wrapper.mock_safe(|_, _| { unsafe { ESTIMATE_GAS_CALLED = true }; MockResult::Return(Box::new(futures01::future::ok(APPROVE_GAS_LIMIT.into()))) }); @@ -745,7 +760,7 @@ fn test_get_fee_to_send_taker_fee() { const TRANSFER_GAS_LIMIT: u64 = 40_000; EthCoin::get_gas_price.mock_safe(|_| MockResult::Return(Box::new(futures01::future::ok(GAS_PRICE.into())))); - EthCoin::estimate_gas + EthCoin::estimate_gas_wrapper .mock_safe(|_, _| MockResult::Return(Box::new(futures01::future::ok(TRANSFER_GAS_LIMIT.into())))); // fee to send taker fee is `TRANSFER_GAS_LIMIT * gas_price` always. @@ -821,7 +836,7 @@ fn validate_dex_fee_invalid_sender_eth() { let (_ctx, coin) = eth_coin_for_test(EthCoinType::Eth, &[ETH_MAINNET_NODE], None); // the real dex fee sent on mainnet // https://etherscan.io/tx/0x7e9ca16c85efd04ee5e31f2c1914b48f5606d6f9ce96ecce8c96d47d6857278f - let tx = block_on(coin.web3.eth().transaction(TransactionId::Hash( + let tx = block_on(block_on(coin.web3()).unwrap().eth().transaction(TransactionId::Hash( H256::from_str("0x7e9ca16c85efd04ee5e31f2c1914b48f5606d6f9ce96ecce8c96d47d6857278f").unwrap(), ))) .unwrap() @@ -855,7 +870,7 @@ fn validate_dex_fee_invalid_sender_erc() { ); // the real dex fee sent on mainnet // https://etherscan.io/tx/0xd6403b41c79f9c9e9c83c03d920ee1735e7854d85d94cef48d95dfeca95cd600 - let tx = block_on(coin.web3.eth().transaction(TransactionId::Hash( + let tx = block_on(block_on(coin.web3()).unwrap().eth().transaction(TransactionId::Hash( H256::from_str("0xd6403b41c79f9c9e9c83c03d920ee1735e7854d85d94cef48d95dfeca95cd600").unwrap(), ))) .unwrap() @@ -891,7 +906,7 @@ fn validate_dex_fee_eth_confirmed_before_min_block() { let (_ctx, coin) = eth_coin_for_test(EthCoinType::Eth, &[ETH_MAINNET_NODE], None); // the real dex fee sent on mainnet // https://etherscan.io/tx/0x7e9ca16c85efd04ee5e31f2c1914b48f5606d6f9ce96ecce8c96d47d6857278f - let tx = block_on(coin.web3.eth().transaction(TransactionId::Hash( + let tx = block_on(block_on(coin.web3()).unwrap().eth().transaction(TransactionId::Hash( H256::from_str("0x7e9ca16c85efd04ee5e31f2c1914b48f5606d6f9ce96ecce8c96d47d6857278f").unwrap(), ))) .unwrap() @@ -927,7 +942,7 @@ fn validate_dex_fee_erc_confirmed_before_min_block() { ); // the real dex fee sent on mainnet // https://etherscan.io/tx/0xd6403b41c79f9c9e9c83c03d920ee1735e7854d85d94cef48d95dfeca95cd600 - let tx = block_on(coin.web3.eth().transaction(TransactionId::Hash( + let tx = block_on(block_on(coin.web3()).unwrap().eth().transaction(TransactionId::Hash( H256::from_str("0xd6403b41c79f9c9e9c83c03d920ee1735e7854d85d94cef48d95dfeca95cd600").unwrap(), ))) .unwrap() @@ -1077,7 +1092,12 @@ fn test_message_hash() { &hex::decode("809465b17d0a4ddb3e4c69e8f23c2cabad868f51f8bed5c765ad1d6516c3306f").unwrap(), ) .unwrap(); - let transport = Web3Transport::single_node(ETH_DEV_NODE, false); + let node = HttpTransportNode { + uri: ETH_DEV_NODE.parse().unwrap(), + gui_auth: false, + }; + + let transport = Web3Transport::new_http(node); let web3 = Web3::new(transport); let ctx = MmCtxBuilder::new().into_mm_arc(); let coin = EthCoin(Arc::new(EthCoinImpl { @@ -1089,11 +1109,7 @@ fn test_message_hash() { swap_contract_address: Address::from_str(ETH_DEV_SWAP_CONTRACT).unwrap(), fallback_swap_contract: None, contract_supports_watchers: false, - web3_instances: vec![Web3Instance { - web3: web3.clone(), - is_parity: false, - }], - web3, + web3_instances: AsyncMutex::new(vec![Web3Instance { web3, is_parity: false }]), decimals: 18, gas_station_url: None, gas_station_decimals: ETH_GAS_STATION_DECIMALS, @@ -1121,7 +1137,13 @@ fn test_sign_verify_message() { &hex::decode("809465b17d0a4ddb3e4c69e8f23c2cabad868f51f8bed5c765ad1d6516c3306f").unwrap(), ) .unwrap(); - let transport = Web3Transport::single_node(ETH_DEV_NODE, false); + + let node = HttpTransportNode { + uri: ETH_DEV_NODE.parse().unwrap(), + gui_auth: false, + }; + + let transport = Web3Transport::new_http(node); let web3 = Web3::new(transport); let ctx = MmCtxBuilder::new().into_mm_arc(); @@ -1134,11 +1156,7 @@ fn test_sign_verify_message() { swap_contract_address: Address::from_str(ETH_DEV_SWAP_CONTRACT).unwrap(), fallback_swap_contract: None, contract_supports_watchers: false, - web3_instances: vec![Web3Instance { - web3: web3.clone(), - is_parity: false, - }], - web3, + web3_instances: AsyncMutex::new(vec![Web3Instance { web3, is_parity: false }]), decimals: 18, gas_station_url: None, gas_station_decimals: ETH_GAS_STATION_DECIMALS, @@ -1169,7 +1187,15 @@ fn test_eth_extract_secret() { &hex::decode("809465b17d0a4ddb3e4c69e8f23c2cabad868f51f8bed5c765ad1d6516c3306f").unwrap(), ) .unwrap(); - let transport = Web3Transport::single_node("https://ropsten.infura.io/v3/c01c1b4cf66642528547624e1d6d9d6b", false); + let node = HttpTransportNode { + uri: "https://ropsten.infura.io/v3/c01c1b4cf66642528547624e1d6d9d6b" + .parse() + .unwrap(), + gui_auth: false, + }; + + let transport = Web3Transport::new_http(node); + let web3 = Web3::new(transport); let ctx = MmCtxBuilder::new().into_mm_arc(); @@ -1191,11 +1217,7 @@ fn test_eth_extract_secret() { fallback_swap_contract: None, contract_supports_watchers: false, ticker: "ETH".into(), - web3_instances: vec![Web3Instance { - web3: web3.clone(), - is_parity: true, - }], - web3, + web3_instances: AsyncMutex::new(vec![Web3Instance { web3, is_parity: true }]), ctx: ctx.weak(), required_confirmations: 1.into(), chain_id: None, diff --git a/mm2src/coins/eth/eth_wasm_tests.rs b/mm2src/coins/eth/eth_wasm_tests.rs index 1b4e7de7ed..f0c94aadfd 100644 --- a/mm2src/coins/eth/eth_wasm_tests.rs +++ b/mm2src/coins/eth/eth_wasm_tests.rs @@ -21,7 +21,11 @@ async fn test_send() { let seed = get_passphrase!(".env.client", "ALICE_PASSPHRASE").unwrap(); let keypair = key_pair_from_seed(&seed).unwrap(); let key_pair = KeyPair::from_secret_slice(keypair.private_ref()).unwrap(); - let transport = Web3Transport::single_node(ETH_DEV_NODE, false); + let node = HttpTransportNode { + uri: ETH_DEV_NODE.parse().unwrap(), + gui_auth: false, + }; + let transport = Web3Transport::new_http(node); let web3 = Web3::new(transport); let ctx = MmCtxBuilder::new().into_mm_arc(); let coin = EthCoin(Arc::new(EthCoinImpl { @@ -33,11 +37,7 @@ async fn test_send() { swap_contract_address: Address::from_str(ETH_DEV_SWAP_CONTRACT).unwrap(), fallback_swap_contract: None, contract_supports_watchers: false, - web3_instances: vec![Web3Instance { - web3: web3.clone(), - is_parity: false, - }], - web3, + web3_instances: AsyncMutex::new(vec![Web3Instance { web3, is_parity: false }]), decimals: 18, gas_station_url: None, gas_station_decimals: ETH_GAS_STATION_DECIMALS, diff --git a/mm2src/coins/eth/v2_activation.rs b/mm2src/coins/eth/v2_activation.rs index d19a6d0914..f340dec7f6 100644 --- a/mm2src/coins/eth/v2_activation.rs +++ b/mm2src/coins/eth/v2_activation.rs @@ -3,9 +3,11 @@ use super::*; use common::executor::AbortedError; use crypto::{CryptoCtxError, StandardHDCoinAddress}; use enum_derives::EnumFromTrait; +use instant::Instant; use mm2_err_handle::common_errors::WithInternal; #[cfg(target_arch = "wasm32")] use mm2_metamask::{from_metamask_error, MetamaskError, MetamaskRpcError, WithMetamaskRpcError}; +use web3_transport::websocket_transport::WebsocketTransport; #[derive(Clone, Debug, Deserialize, Display, EnumFromTrait, PartialEq, Serialize, SerializeErrorType)] #[serde(tag = "error_type", content = "error_data")] @@ -74,13 +76,13 @@ impl Default for EthPrivKeyActivationPolicy { #[derive(Clone, Debug, Deserialize, Serialize)] pub enum EthRpcMode { - Http, + Default, #[cfg(target_arch = "wasm32")] Metamask, } impl Default for EthRpcMode { - fn default() -> Self { EthRpcMode::Http } + fn default() -> Self { EthRpcMode::Default } } #[derive(Clone, Deserialize)] @@ -116,6 +118,7 @@ pub struct EthNode { #[serde(tag = "error_type", content = "error_data")] pub enum Erc20TokenActivationError { InternalError(String), + ClientConnectionFailed(String), CouldNotFetchBalance(String), } @@ -155,14 +158,22 @@ impl EthCoin { let conf = coin_conf(&ctx, &ticker); let decimals = match conf["decimals"].as_u64() { - None | Some(0) => get_token_decimals(&self.web3, protocol.token_addr) - .await - .map_err(Erc20TokenActivationError::InternalError)?, + None | Some(0) => get_token_decimals( + &self + .web3() + .await + .map_err(|e| Erc20TokenActivationError::ClientConnectionFailed(e.to_string()))?, + protocol.token_addr, + ) + .await + .map_err(Erc20TokenActivationError::InternalError)?, Some(d) => d as u8, }; let web3_instances: Vec = self .web3_instances + .lock() + .await .iter() .map(|node| { let mut transport = node.web3.transport().clone(); @@ -177,12 +188,6 @@ impl EthCoin { }) .collect(); - let mut transport = self.web3.transport().clone(); - if let Some(auth) = transport.gui_auth_validation_generator_as_mut() { - auth.coin_ticker = ticker.clone(); - } - let web3 = Web3::new(transport); - let required_confirmations = activation_params .required_confirmations .unwrap_or_else(|| conf["required_confirmations"].as_u64().unwrap_or(1)) @@ -208,8 +213,7 @@ impl EthCoin { gas_station_url: self.gas_station_url.clone(), gas_station_decimals: self.gas_station_decimals, gas_station_policy: self.gas_station_policy.clone(), - web3, - web3_instances, + web3_instances: AsyncMutex::new(web3_instances), history_sync_state: Mutex::new(self.history_sync_state.lock().unwrap().clone()), ctx: self.ctx.clone(), required_confirmations, @@ -255,16 +259,16 @@ pub async fn eth_coin_from_conf_and_request_v2( let chain_id = conf["chain_id"].as_u64(); - let (web3, web3_instances) = match (req.rpc_mode, &priv_key_policy) { + let web3_instances = match (req.rpc_mode, &priv_key_policy) { ( - EthRpcMode::Http, + EthRpcMode::Default, EthPrivKeyPolicy::Iguana(key_pair) | EthPrivKeyPolicy::HDWallet { activated_key: key_pair, .. }, - ) => build_http_transport(ctx, ticker.clone(), my_address_str, key_pair, &req.nodes).await?, - (EthRpcMode::Http, EthPrivKeyPolicy::Trezor) => { + ) => build_web3_instances(ctx, ticker.clone(), my_address_str, key_pair, req.nodes.clone()).await?, + (EthRpcMode::Default, EthPrivKeyPolicy::Trezor) => { return MmError::err(EthActivationV2Error::PrivKeyPolicyNotAllowed( PrivKeyPolicyNotAllowed::HardwareWalletNotSupported, )); @@ -277,7 +281,7 @@ pub async fn eth_coin_from_conf_and_request_v2( build_metamask_transport(ctx, ticker.clone(), chain_id).await? }, #[cfg(target_arch = "wasm32")] - (EthRpcMode::Http, EthPrivKeyPolicy::Metamask(_)) | (EthRpcMode::Metamask, _) => { + (EthRpcMode::Default, EthPrivKeyPolicy::Metamask(_)) | (EthRpcMode::Metamask, _) => { let error = r#"priv_key_policy="Metamask" and rpc_mode="Metamask" should be used both"#.to_string(); return MmError::err(EthActivationV2Error::ActivationFailed { ticker, error }); }, @@ -317,8 +321,7 @@ pub async fn eth_coin_from_conf_and_request_v2( gas_station_url: req.gas_station_url, gas_station_decimals: req.gas_station_decimals.unwrap_or(ETH_GAS_STATION_DECIMALS), gas_station_policy: req.gas_station_policy, - web3, - web3_instances, + web3_instances: AsyncMutex::new(web3_instances), history_sync_state: Mutex::new(HistorySyncState::NotEnabled), ctx: ctx.weak(), required_confirmations, @@ -383,58 +386,94 @@ pub(crate) async fn build_address_and_priv_key_policy( } } -async fn build_http_transport( +async fn build_web3_instances( ctx: &MmArc, coin_ticker: String, address: String, key_pair: &KeyPair, - eth_nodes: &[EthNode], -) -> MmResult<(Web3, Vec), EthActivationV2Error> { + mut eth_nodes: Vec, +) -> MmResult, EthActivationV2Error> { if eth_nodes.is_empty() { return MmError::err(EthActivationV2Error::AtLeastOneNodeRequired); } - let mut http_nodes = vec![]; - for node in eth_nodes { - let uri = node + let mut rng = small_rng(); + eth_nodes.as_mut_slice().shuffle(&mut rng); + drop_mutability!(eth_nodes); + + let event_handlers = rpc_event_handlers_for_eth_transport(ctx, coin_ticker.clone()); + + let mut web3_instances = Vec::with_capacity(eth_nodes.len()); + for eth_node in eth_nodes { + let uri: Uri = eth_node .url .parse() - .map_err(|_| EthActivationV2Error::InvalidPayload(format!("{} could not be parsed.", node.url)))?; + .map_err(|_| EthActivationV2Error::InvalidPayload(format!("{} could not be parsed.", eth_node.url)))?; - http_nodes.push(HttpTransportNode { - uri, - gui_auth: node.gui_auth, - }); - } + let transport = match uri.scheme_str() { + Some("ws") | Some("wss") => { + const TMP_SOCKET_CONNECTION: Duration = Duration::from_secs(20); - let mut rng = small_rng(); - http_nodes.as_mut_slice().shuffle(&mut rng); + let node = WebsocketTransportNode { + uri: uri.clone(), + gui_auth: eth_node.gui_auth, + }; - drop_mutability!(http_nodes); + let mut websocket_transport = WebsocketTransport::with_event_handlers(node, event_handlers.clone()); - let mut web3_instances = Vec::with_capacity(http_nodes.len()); - let event_handlers = rpc_event_handlers_for_eth_transport(ctx, coin_ticker.clone()); - for node in http_nodes.iter() { - let transport = build_single_http_transport( - coin_ticker.clone(), - address.clone(), - key_pair, - vec![node.clone()], - event_handlers.clone(), - ); + if eth_node.gui_auth { + websocket_transport.gui_auth_validation_generator = Some(GuiAuthValidationGenerator { + coin_ticker: coin_ticker.clone(), + secret: key_pair.secret().clone(), + address: address.clone(), + }); + } + + // Temporarily start the connection loop (we close the connection once we have the client version below). + // Ideally, it would be much better to not do this workaround, which requires a lot of refactoring or + // dropping websocket support on parity nodes. + let fut = websocket_transport + .clone() + .start_connection_loop(Some(Instant::now() + TMP_SOCKET_CONNECTION)); + let settings = AbortSettings::info_on_abort(format!("connection loop stopped for {:?}", uri)); + ctx.spawner().spawn_with_settings(fut, settings); + + Web3Transport::Websocket(websocket_transport) + }, + Some("http") | Some("https") => { + let node = HttpTransportNode { + uri, + gui_auth: eth_node.gui_auth, + }; + + build_http_transport( + coin_ticker.clone(), + address.clone(), + key_pair, + node, + event_handlers.clone(), + ) + }, + _ => { + return MmError::err(EthActivationV2Error::InvalidPayload(format!( + "Invalid node address '{uri}'. Only http(s) and ws(s) nodes are supported" + ))); + }, + }; let web3 = Web3::new(transport); let version = match web3.web3().client_version().await { Ok(v) => v, Err(e) => { - error!("Couldn't get client version for url {}: {}", node.uri, e); + error!("Couldn't get client version for url {}: {}", eth_node.url, e); continue; }, }; + web3_instances.push(Web3Instance { web3, is_parity: version.contains("Parity") || version.contains("parity"), - }) + }); } if web3_instances.is_empty() { @@ -443,27 +482,28 @@ async fn build_http_transport( ); } - let transport = build_single_http_transport(coin_ticker, address, key_pair, http_nodes, event_handlers); - let web3 = Web3::new(transport); - - Ok((web3, web3_instances)) + Ok(web3_instances) } -fn build_single_http_transport( +fn build_http_transport( coin_ticker: String, address: String, key_pair: &KeyPair, - nodes: Vec, + node: HttpTransportNode, event_handlers: Vec, ) -> Web3Transport { use crate::eth::web3_transport::http_transport::HttpTransport; - let mut http_transport = HttpTransport::with_event_handlers(nodes, event_handlers); - http_transport.gui_auth_validation_generator = Some(GuiAuthValidationGenerator { - coin_ticker, - secret: key_pair.secret().clone(), - address, - }); + let gui_auth = node.gui_auth; + let mut http_transport = HttpTransport::with_event_handlers(node, event_handlers); + + if gui_auth { + http_transport.gui_auth_validation_generator = Some(GuiAuthValidationGenerator { + coin_ticker, + secret: key_pair.secret().clone(), + address, + }); + } Web3Transport::from(http_transport) } @@ -472,11 +512,14 @@ async fn build_metamask_transport( ctx: &MmArc, coin_ticker: String, chain_id: u64, -) -> MmResult<(Web3, Vec), EthActivationV2Error> { +) -> MmResult, EthActivationV2Error> { let event_handlers = rpc_event_handlers_for_eth_transport(ctx, coin_ticker.clone()); let eth_config = web3_transport::metamask_transport::MetamaskEthConfig { chain_id }; - let web3 = Web3::new(Web3Transport::new_metamask(eth_config, event_handlers)?); + let web3 = Web3::new(Web3Transport::new_metamask_with_event_handlers( + eth_config, + event_handlers, + )?); // Check if MetaMask supports the given `chain_id`. // Please note that this request may take a long time. @@ -484,12 +527,9 @@ async fn build_metamask_transport( // MetaMask doesn't use Parity nodes. So `MetamaskTransport` doesn't support `parity_nextNonce` RPC. // An example of the `web3_clientVersion` RPC - `MetaMask/v10.22.1`. - let web3_instances = vec![Web3Instance { - web3: web3.clone(), - is_parity: false, - }]; + let web3_instances = vec![Web3Instance { web3, is_parity: false }]; - Ok((web3, web3_instances)) + Ok(web3_instances) } /// This method is based on the fact that `MetamaskTransport` tries to switch the `ChainId` @@ -508,7 +548,7 @@ async fn check_metamask_supports_chain_id( match web3.eth().chain_id().await { Ok(chain_id) if chain_id == U256::from(expected_chain_id) => Ok(()), - // The RPC client should have returned ChainId with which it has been created on [`Web3Transport::new_metamask`]. + // The RPC client should have returned ChainId with which it has been created on [`Web3Transport::new_metamask_with_event_handlers`]. Ok(unexpected_chain_id) => { let error = format!("Expected '{expected_chain_id}' ChainId, found '{unexpected_chain_id}'"); MmError::err(EthActivationV2Error::InternalError(error)) diff --git a/mm2src/coins/eth/web3_transport/http_transport.rs b/mm2src/coins/eth/web3_transport/http_transport.rs index 997ff034d7..c200f66c8a 100644 --- a/mm2src/coins/eth/web3_transport/http_transport.rs +++ b/mm2src/coins/eth/web3_transport/http_transport.rs @@ -1,29 +1,27 @@ -use crate::eth::{web3_transport::Web3SendOut, EthCoin, GuiAuthMessages, RpcTransportEventHandler, - RpcTransportEventHandlerShared, Web3RpcError}; +use crate::eth::web3_transport::handle_gui_auth_payload; +use crate::eth::{web3_transport::Web3SendOut, RpcTransportEventHandler, RpcTransportEventHandlerShared, Web3RpcError}; use common::APPLICATION_JSON; -use futures::lock::Mutex as AsyncMutex; use http::header::CONTENT_TYPE; use jsonrpc_core::{Call, Response}; use mm2_net::transport::{GuiAuthValidation, GuiAuthValidationGenerator}; use serde_json::Value as Json; -#[cfg(not(target_arch = "wasm32"))] use std::ops::Deref; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::ops::Deref; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use web3::error::{Error, TransportError}; use web3::helpers::{build_request, to_result_from_output, to_string}; use web3::{RequestId, Transport}; -#[derive(Serialize, Clone)] +#[derive(Clone, Serialize)] pub struct AuthPayload<'a> { #[serde(flatten)] pub request: &'a Call, pub signed_message: GuiAuthValidation, } -/// Parse bytes RPC response into `Result`. +/// Deserialize bytes RPC response into `Result`. /// Implementation copied from Web3 HTTP transport -#[cfg(not(target_arch = "wasm32"))] -fn single_response(response: T, rpc_url: &str) -> Result +pub(crate) fn de_rpc_response(response: T, rpc_url: &str) -> Result where T: Deref + std::fmt::Debug, { @@ -42,18 +40,11 @@ where } } -#[derive(Debug)] -struct HttpTransportRpcClient(AsyncMutex); - -#[derive(Debug)] -struct HttpTransportRpcClientImpl { - nodes: Vec, -} - #[derive(Clone, Debug)] pub struct HttpTransport { id: Arc, - client: Arc, + pub(crate) last_request_failed: Arc, + node: HttpTransportNode, event_handlers: Vec, pub(crate) gui_auth_validation_generator: Option, } @@ -65,45 +56,26 @@ pub struct HttpTransportNode { } impl HttpTransport { - #[cfg(test)] #[inline] - pub fn new(nodes: Vec) -> Self { - let client_impl = HttpTransportRpcClientImpl { nodes }; + #[cfg(any(test, target_arch = "wasm32"))] + pub fn new(node: HttpTransportNode) -> Self { HttpTransport { id: Arc::new(AtomicUsize::new(0)), - client: Arc::new(HttpTransportRpcClient(AsyncMutex::new(client_impl))), + node, event_handlers: Default::default(), gui_auth_validation_generator: None, + last_request_failed: Arc::new(AtomicBool::new(false)), } } #[inline] - pub fn with_event_handlers( - nodes: Vec, - event_handlers: Vec, - ) -> Self { - let client_impl = HttpTransportRpcClientImpl { nodes }; + pub fn with_event_handlers(node: HttpTransportNode, event_handlers: Vec) -> Self { HttpTransport { id: Arc::new(AtomicUsize::new(0)), - client: Arc::new(HttpTransportRpcClient(AsyncMutex::new(client_impl))), + node, event_handlers, gui_auth_validation_generator: None, - } - } - - #[allow(dead_code)] - pub fn single_node(url: &'static str, gui_auth: bool) -> Self { - let nodes = vec![HttpTransportNode { - uri: url.parse().unwrap(), - gui_auth, - }]; - let client_impl = HttpTransportRpcClientImpl { nodes }; - - HttpTransport { - id: Arc::new(AtomicUsize::new(0)), - client: Arc::new(HttpTransportRpcClient(AsyncMutex::new(client_impl))), - event_handlers: Default::default(), - gui_auth_validation_generator: None, + last_request_failed: Arc::new(AtomicBool::new(false)), } } } @@ -119,72 +91,14 @@ impl Transport for HttpTransport { } #[cfg(not(target_arch = "wasm32"))] - fn send(&self, _id: RequestId, request: Call) -> Self::Out { - Box::pin(send_request( - request, - self.client.clone(), - self.event_handlers.clone(), - self.gui_auth_validation_generator.clone(), - )) - } + fn send(&self, _id: RequestId, request: Call) -> Self::Out { Box::pin(send_request(request, self.clone())) } #[cfg(target_arch = "wasm32")] - fn send(&self, _id: RequestId, request: Call) -> Self::Out { - Box::pin(send_request( - request, - self.client.clone(), - self.event_handlers.clone(), - self.gui_auth_validation_generator.clone(), - )) - } -} - -/// Generates a signed message and inserts it into request -/// payload if gui_auth is activated. Returns false on errors. -fn handle_gui_auth_payload_if_activated( - gui_auth_validation_generator: &Option, - node: &HttpTransportNode, - request: &Call, -) -> Result, Web3RpcError> { - if !node.gui_auth { - return Ok(None); - } - - let generator = match gui_auth_validation_generator.clone() { - Some(gen) => gen, - None => { - return Err(Web3RpcError::Internal(format!( - "GuiAuthValidationGenerator is not provided for {:?} node", - node - ))); - }, - }; - - let signed_message = match EthCoin::generate_gui_auth_signed_validation(generator) { - Ok(t) => t, - Err(e) => { - return Err(Web3RpcError::Internal(format!( - "GuiAuth signed message generation failed for {:?} node, error: {:?}", - node, e - ))); - }, - }; - - let auth_request = AuthPayload { - request, - signed_message, - }; - - Ok(Some(to_string(&auth_request))) + fn send(&self, _id: RequestId, request: Call) -> Self::Out { Box::pin(send_request(request, self.clone())) } } #[cfg(not(target_arch = "wasm32"))] -async fn send_request( - request: Call, - client: Arc, - event_handlers: Vec, - gui_auth_validation_generator: Option, -) -> Result { +async fn send_request(request: Call, transport: HttpTransport) -> Result { use common::executor::Timer; use common::log::warn; use futures::future::{select, Either}; @@ -194,129 +108,107 @@ async fn send_request( const REQUEST_TIMEOUT_S: f64 = 20.; - let mut errors = Vec::new(); - - let serialized_request = to_string(&request); + let mut serialized_request = to_string(&request); - let mut client_impl = client.0.lock().await; - - for (i, node) in client_impl.nodes.clone().iter().enumerate() { - let serialized_request = - match handle_gui_auth_payload_if_activated(&gui_auth_validation_generator, node, &request) { - Ok(Some(r)) => r, - Ok(None) => serialized_request.clone(), - Err(e) => { - errors.push(e); - continue; - }, - }; - - event_handlers.on_outgoing_request(serialized_request.as_bytes()); - - let mut req = http::Request::new(serialized_request.into_bytes()); - *req.method_mut() = http::Method::POST; - *req.uri_mut() = node.uri.clone(); - req.headers_mut() - .insert(CONTENT_TYPE, HeaderValue::from_static(APPLICATION_JSON)); - let timeout = Timer::sleep(REQUEST_TIMEOUT_S); - let req = Box::pin(slurp_req(req)); - let rc = select(req, timeout).await; - let res = match rc { - Either::Left((r, _t)) => r, - Either::Right((_t, _r)) => { - let (method, id) = match &request { - Call::MethodCall(m) => (m.method.clone(), m.id.clone()), - Call::Notification(n) => (n.method.clone(), jsonrpc_core::Id::Null), - Call::Invalid { id } => ("Invalid call".to_string(), id.clone()), - }; - let error = format!( - "Error requesting '{}': {}s timeout expired, method: '{}', id: {:?}", - node.uri, REQUEST_TIMEOUT_S, method, id - ); - warn!("{}", error); - errors.push(Web3RpcError::Transport(error)); - continue; + if transport.node.gui_auth { + match handle_gui_auth_payload(&transport.gui_auth_validation_generator, &request) { + Ok(r) => serialized_request = r, + Err(e) => { + return Err(request_failed_error(request, e)); }, }; + } - let (status, _headers, body) = match res { - Ok(r) => r, - Err(err) => { - errors.push(Web3RpcError::Transport(err.to_string())); - continue; - }, - }; + transport + .event_handlers + .on_outgoing_request(serialized_request.as_bytes()); + + let mut req = http::Request::new(serialized_request.into_bytes()); + *req.method_mut() = http::Method::POST; + *req.uri_mut() = transport.node.uri.clone(); + req.headers_mut() + .insert(CONTENT_TYPE, HeaderValue::from_static(APPLICATION_JSON)); + let timeout = Timer::sleep(REQUEST_TIMEOUT_S); + let req = Box::pin(slurp_req(req)); + let rc = select(req, timeout).await; + let res = match rc { + Either::Left((r, _t)) => r, + Either::Right((_t, _r)) => { + let (method, id) = match &request { + Call::MethodCall(m) => (m.method.clone(), m.id.clone()), + Call::Notification(n) => (n.method.clone(), jsonrpc_core::Id::Null), + Call::Invalid { id } => ("Invalid call".to_string(), id.clone()), + }; + let error = format!( + "Error requesting '{}': {}s timeout expired, method: '{}', id: {:?}", + transport.node.uri, REQUEST_TIMEOUT_S, method, id + ); + warn!("{}", error); + return Err(request_failed_error(request, Web3RpcError::Transport(error))); + }, + }; + + let (status, _headers, body) = match res { + Ok(r) => r, + Err(err) => { + return Err(request_failed_error(request, Web3RpcError::Transport(err.to_string()))); + }, + }; - event_handlers.on_incoming_response(&body); + transport.event_handlers.on_incoming_response(&body); - if !status.is_success() { - errors.push(Web3RpcError::Transport(format!( + if !status.is_success() { + return Err(request_failed_error( + request, + Web3RpcError::Transport(format!( "Server: '{}', response !200: {}, {}", - node.uri, + transport.node.uri, status, binprint(&body, b'.') - ))); - continue; - } - - let res = match single_response(body, &node.uri.to_string()) { - Ok(r) => r, - Err(err) => { - errors.push(Web3RpcError::InvalidResponse(format!( - "Server: '{}', error: {}", - node.uri, err - ))); - continue; - }, - }; - - client_impl.nodes.rotate_left(i); - - return Ok(res); + )), + )); } - Err(request_failed_error(&request, &errors)) + let res = match de_rpc_response(body, &transport.node.uri.to_string()) { + Ok(r) => r, + Err(err) => { + return Err(request_failed_error( + request, + Web3RpcError::InvalidResponse(format!("Server: '{}', error: {}", transport.node.uri, err)), + )); + }, + }; + + Ok(res) } #[cfg(target_arch = "wasm32")] -async fn send_request( - request: Call, - client: Arc, - event_handlers: Vec, - gui_auth_validation_generator: Option, -) -> Result { - let serialized_request = to_string(&request); - - let mut errors = Vec::new(); - let mut client_impl = client.0.lock().await; - - for (i, node) in client_impl.nodes.clone().iter().enumerate() { - let serialized_request = - match handle_gui_auth_payload_if_activated(&gui_auth_validation_generator, node, &request) { - Ok(Some(r)) => r, - Ok(None) => serialized_request.clone(), - Err(e) => { - errors.push(e); - continue; - }, - }; - - match send_request_once(serialized_request, &node.uri, &event_handlers).await { - Ok(response_json) => { - client_impl.nodes.rotate_left(i); - return Ok(response_json); +async fn send_request(request: Call, transport: HttpTransport) -> Result { + let mut serialized_request = to_string(&request); + + if transport.node.gui_auth { + match handle_gui_auth_payload(&transport.gui_auth_validation_generator, &request) { + Ok(r) => serialized_request = r, + Err(e) => { + return Err(request_failed_error( + request, + Web3RpcError::Transport(format!("Server: '{}', error: {}", transport.node.uri, e)), + )); }, - Err(Error::Transport(e)) => { - errors.push(Web3RpcError::Transport(format!("Server: '{}', error: {}", node.uri, e))) - }, - Err(e) => errors.push(Web3RpcError::InvalidResponse(format!( - "Server: '{}', error: {}", - node.uri, e - ))), - } + }; } - Err(request_failed_error(&request, &errors)) + match send_request_once(serialized_request, &transport.node.uri, &transport.event_handlers).await { + Ok(response_json) => Ok(response_json), + Err(Error::Transport(e)) => Err(request_failed_error( + request, + Web3RpcError::Transport(format!("Server: '{}', error: {}", transport.node.uri, e)), + )), + Err(e) => Err(request_failed_error( + request, + Web3RpcError::InvalidResponse(format!("Server: '{}', error: {}", transport.node.uri, e)), + )), + } } #[cfg(target_arch = "wasm32")] @@ -360,8 +252,7 @@ async fn send_request_once( } } -fn request_failed_error(request: &Call, errors: &[Web3RpcError]) -> Error { - let errors: String = errors.iter().map(|e| format!("{:?}; ", e)).collect(); - let error = format!("request {:?} failed: {}", request, errors); +fn request_failed_error(request: Call, error: Web3RpcError) -> Error { + let error = format!("request {:?} failed: {}", request, error); Error::Transport(TransportError::Message(error)) } diff --git a/mm2src/coins/eth/web3_transport/metamask_transport.rs b/mm2src/coins/eth/web3_transport/metamask_transport.rs index 5fe71d8dc2..a586f9e7f9 100644 --- a/mm2src/coins/eth/web3_transport/metamask_transport.rs +++ b/mm2src/coins/eth/web3_transport/metamask_transport.rs @@ -4,6 +4,7 @@ use jsonrpc_core::Call; use mm2_metamask::{detect_metamask_provider, Eip1193Provider, MetamaskResult, MetamaskSession}; use serde_json::Value as Json; use std::fmt; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use web3::{RequestId, Transport}; @@ -15,6 +16,7 @@ pub(crate) struct MetamaskEthConfig { #[derive(Clone)] pub(crate) struct MetamaskTransport { inner: Arc, + pub(crate) last_request_failed: Arc, } struct MetamaskTransportInner { @@ -35,7 +37,10 @@ impl MetamaskTransport { eip1193, _event_handlers: event_handlers, }; - Ok(MetamaskTransport { inner: Arc::new(inner) }) + Ok(MetamaskTransport { + inner: Arc::new(inner), + last_request_failed: Arc::new(AtomicBool::new(false)), + }) } } diff --git a/mm2src/coins/eth/web3_transport/mod.rs b/mm2src/coins/eth/web3_transport/mod.rs index f8b7d62fbd..1858dddfc4 100644 --- a/mm2src/coins/eth/web3_transport/mod.rs +++ b/mm2src/coins/eth/web3_transport/mod.rs @@ -1,4 +1,3 @@ -use crate::RpcTransportEventHandlerShared; use ethereum_types::U256; use futures::future::BoxFuture; use jsonrpc_core::Call; @@ -6,52 +5,71 @@ use jsonrpc_core::Call; use mm2_net::transport::GuiAuthValidationGenerator; use serde_json::Value as Json; use serde_json::Value; -use web3::api::Namespace; -use web3::helpers::{self, CallFuture}; -use web3::types::BlockNumber; +use std::sync::atomic::Ordering; +use web3::helpers::to_string; use web3::{Error, RequestId, Transport}; +use self::http_transport::AuthPayload; +use super::{EthCoin, GuiAuthMessages, Web3RpcError}; +use crate::RpcTransportEventHandlerShared; + pub(crate) mod http_transport; #[cfg(target_arch = "wasm32")] pub(crate) mod metamask_transport; +pub(crate) mod websocket_transport; -type Web3SendOut = BoxFuture<'static, Result>; +pub(crate) type Web3SendOut = BoxFuture<'static, Result>; #[derive(Clone, Debug)] pub(crate) enum Web3Transport { Http(http_transport::HttpTransport), + Websocket(websocket_transport::WebsocketTransport), #[cfg(target_arch = "wasm32")] Metamask(metamask_transport::MetamaskTransport), } impl Web3Transport { - pub fn new_http( - nodes: Vec, + pub fn new_http_with_event_handlers( + node: http_transport::HttpTransportNode, event_handlers: Vec, ) -> Web3Transport { - http_transport::HttpTransport::with_event_handlers(nodes, event_handlers).into() + http_transport::HttpTransport::with_event_handlers(node, event_handlers).into() } #[cfg(target_arch = "wasm32")] - pub(crate) fn new_metamask( + pub(crate) fn new_metamask_with_event_handlers( eth_config: metamask_transport::MetamaskEthConfig, event_handlers: Vec, ) -> MetamaskResult { Ok(metamask_transport::MetamaskTransport::detect(eth_config, event_handlers)?.into()) } - #[cfg(test)] - pub fn with_nodes(nodes: Vec) -> Web3Transport { - http_transport::HttpTransport::new(nodes).into() + pub fn is_last_request_failed(&self) -> bool { + match self { + Web3Transport::Http(http) => http.last_request_failed.load(Ordering::SeqCst), + Web3Transport::Websocket(websocket) => websocket.last_request_failed.load(Ordering::SeqCst), + #[cfg(target_arch = "wasm32")] + Web3Transport::Metamask(metamask) => metamask.last_request_failed.load(Ordering::SeqCst), + } + } + + fn set_last_request_failed(&self, val: bool) { + match self { + Web3Transport::Http(http) => http.last_request_failed.store(val, Ordering::SeqCst), + Web3Transport::Websocket(websocket) => websocket.last_request_failed.store(val, Ordering::SeqCst), + #[cfg(target_arch = "wasm32")] + Web3Transport::Metamask(metamask) => metamask.last_request_failed.store(val, Ordering::SeqCst), + } } - #[allow(dead_code)] - pub fn single_node(url: &'static str, gui_auth: bool) -> Self { - http_transport::HttpTransport::single_node(url, gui_auth).into() + #[cfg(any(test, target_arch = "wasm32"))] + pub fn new_http(node: http_transport::HttpTransportNode) -> Web3Transport { + http_transport::HttpTransport::new(node).into() } pub fn gui_auth_validation_generator_as_mut(&mut self) -> Option<&mut GuiAuthValidationGenerator> { match self { Web3Transport::Http(http) => http.gui_auth_validation_generator.as_mut(), + Web3Transport::Websocket(websocket) => websocket.gui_auth_validation_generator.as_mut(), #[cfg(target_arch = "wasm32")] Web3Transport::Metamask(_) => None, } @@ -64,17 +82,29 @@ impl Transport for Web3Transport { fn prepare(&self, method: &str, params: Vec) -> (RequestId, Call) { match self { Web3Transport::Http(http) => http.prepare(method, params), + Web3Transport::Websocket(websocket) => websocket.prepare(method, params), #[cfg(target_arch = "wasm32")] Web3Transport::Metamask(metamask) => metamask.prepare(method, params), } } fn send(&self, id: RequestId, request: Call) -> Self::Out { - match self { - Web3Transport::Http(http) => http.send(id, request), - #[cfg(target_arch = "wasm32")] - Web3Transport::Metamask(metamask) => metamask.send(id, request), - } + let selfi = self.clone(); + let fut = async move { + let result = match &selfi { + Web3Transport::Http(http) => http.send(id, request), + Web3Transport::Websocket(websocket) => websocket.send(id, request), + #[cfg(target_arch = "wasm32")] + Web3Transport::Metamask(metamask) => metamask.send(id, request), + } + .await; + + selfi.set_last_request_failed(result.is_err()); + + result + }; + + Box::pin(fut) } } @@ -82,29 +112,15 @@ impl From for Web3Transport { fn from(http: http_transport::HttpTransport) -> Self { Web3Transport::Http(http) } } +impl From for Web3Transport { + fn from(websocket: websocket_transport::WebsocketTransport) -> Self { Web3Transport::Websocket(websocket) } +} + #[cfg(target_arch = "wasm32")] impl From for Web3Transport { fn from(metamask: metamask_transport::MetamaskTransport) -> Self { Web3Transport::Metamask(metamask) } } -/// eth_feeHistory support is missing even in the latest rust-web3 -/// It's the custom namespace implementing it -#[derive(Debug, Clone)] -pub struct EthFeeHistoryNamespace { - transport: T, -} - -impl Namespace for EthFeeHistoryNamespace { - fn new(transport: T) -> Self - where - Self: Sized, - { - Self { transport } - } - - fn transport(&self) -> &T { &self.transport } -} - #[derive(Debug, Deserialize)] pub struct FeeHistoryResult { #[serde(rename = "oldestBlock")] @@ -113,17 +129,34 @@ pub struct FeeHistoryResult { pub base_fee_per_gas: Vec, } -impl EthFeeHistoryNamespace { - pub fn eth_fee_history( - &self, - count: U256, - block: BlockNumber, - reward_percentiles: &[f64], - ) -> CallFuture { - let count = helpers::serialize(&count); - let block = helpers::serialize(&block); - let reward_percentiles = helpers::serialize(&reward_percentiles); - let params = vec![count, block, reward_percentiles]; - CallFuture::new(self.transport.execute("eth_feeHistory", params)) - } +/// Generates a signed message and inserts it into the request payload. +pub(super) fn handle_gui_auth_payload( + gui_auth_validation_generator: &Option, + request: &Call, +) -> Result { + let generator = match gui_auth_validation_generator.clone() { + Some(gen) => gen, + None => { + return Err(Web3RpcError::Internal( + "GuiAuthValidationGenerator is not provided for".to_string(), + )); + }, + }; + + let signed_message = match EthCoin::generate_gui_auth_signed_validation(generator) { + Ok(t) => t, + Err(e) => { + return Err(Web3RpcError::Internal(format!( + "GuiAuth signed message generation failed. Error: {:?}", + e + ))); + }, + }; + + let auth_request = AuthPayload { + request, + signed_message, + }; + + Ok(to_string(&auth_request)) } diff --git a/mm2src/coins/eth/web3_transport/websocket_transport.rs b/mm2src/coins/eth/web3_transport/websocket_transport.rs new file mode 100644 index 0000000000..f458aacc67 --- /dev/null +++ b/mm2src/coins/eth/web3_transport/websocket_transport.rs @@ -0,0 +1,395 @@ +//! This module offers a transport layer for managing request-response style communication with Ethereum +//! nodes using websockets that can work concurrently. +//! +//! In comparison to HTTP transport, this approach proves to be much quicker (low-latency) and consumes +//! less bandwidth. This efficiency is achieved by avoiding the handling of TCP handshakes (connection reusability) +//! for each request. + +use super::handle_gui_auth_payload; +use super::http_transport::de_rpc_response; +use crate::eth::eth_rpc::ETH_RPC_REQUEST_TIMEOUT; +use crate::eth::web3_transport::Web3SendOut; +use crate::eth::{EthCoin, RpcTransportEventHandlerShared}; +use crate::{MmCoin, RpcTransportEventHandler}; +use common::executor::{AbortSettings, SpawnAbortable, Timer}; +use common::expirable_map::ExpirableMap; +use common::log; +use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; +use futures::channel::oneshot; +use futures::lock::Mutex as AsyncMutex; +use futures_ticker::Ticker; +use futures_util::{FutureExt, SinkExt, StreamExt}; +use instant::{Duration, Instant}; +use jsonrpc_core::Call; +use mm2_net::transport::GuiAuthValidationGenerator; +use std::sync::atomic::AtomicBool; +use std::sync::{atomic::{AtomicUsize, Ordering}, + Arc}; +use tokio_tungstenite_wasm::WebSocketStream; +use web3::error::{Error, TransportError}; +use web3::helpers::to_string; +use web3::{helpers::build_request, RequestId, Transport}; + +const MAX_ATTEMPTS: u32 = 3; +const SLEEP_DURATION: f64 = 1.; +const KEEPALIVE_DURATION: Duration = Duration::from_secs(10); + +#[derive(Clone, Debug)] +pub(crate) struct WebsocketTransportNode { + pub(crate) uri: http::Uri, + pub(crate) gui_auth: bool, +} + +#[derive(Clone, Debug)] +pub struct WebsocketTransport { + request_id: Arc, + pub(crate) last_request_failed: Arc, + node: WebsocketTransportNode, + event_handlers: Vec, + pub(crate) gui_auth_validation_generator: Option, + controller_channel: Arc, + connection_guard: Arc>, +} + +#[derive(Debug)] +struct ControllerChannel { + tx: Arc>>, + rx: Arc>>, +} + +enum ControllerMessage { + Request(WsRequest), + Close, +} + +#[derive(Debug)] +struct WsRequest { + serialized_request: String, + request_id: RequestId, + response_notifier: oneshot::Sender>, +} + +enum OuterAction { + None, + Continue, + Break, + Return, +} + +impl WebsocketTransport { + pub(crate) fn with_event_handlers( + node: WebsocketTransportNode, + event_handlers: Vec, + ) -> Self { + let (req_tx, req_rx) = futures::channel::mpsc::unbounded(); + + WebsocketTransport { + node, + event_handlers, + request_id: Arc::new(AtomicUsize::new(1)), + controller_channel: ControllerChannel { + tx: Arc::new(AsyncMutex::new(req_tx)), + rx: Arc::new(AsyncMutex::new(req_rx)), + } + .into(), + connection_guard: Arc::new(AsyncMutex::new(())), + gui_auth_validation_generator: None, + last_request_failed: Arc::new(AtomicBool::new(false)), + } + } + + async fn handle_keepalive( + &self, + wsocket: &mut WebSocketStream, + response_notifiers: &mut ExpirableMap>>, + expires_at: Option, + ) -> OuterAction { + const SIMPLE_REQUEST: &str = r#"{"jsonrpc":"2.0","method":"net_version","params":[],"id": 0 }"#; + + if let Some(expires_at) = expires_at { + if Instant::now() >= expires_at { + log::debug!("Dropping temporary connection for {:?}", self.node.uri.to_string()); + return OuterAction::Break; + } + } + + // Drop expired response notifier channels + response_notifiers.clear_expired_entries(); + + let mut should_continue = Default::default(); + for _ in 0..MAX_ATTEMPTS { + match wsocket + .send(tokio_tungstenite_wasm::Message::Text(SIMPLE_REQUEST.to_string())) + .await + { + Ok(_) => { + should_continue = false; + break; + }, + Err(e) => { + log::error!("{e}"); + should_continue = true; + }, + }; + + Timer::sleep(SLEEP_DURATION).await; + } + + if should_continue { + return OuterAction::Continue; + } + + OuterAction::None + } + + async fn handle_send_request( + &self, + request: Option, + wsocket: &mut WebSocketStream, + response_notifiers: &mut ExpirableMap>>, + ) -> OuterAction { + match request { + Some(ControllerMessage::Request(WsRequest { + request_id, + serialized_request, + response_notifier, + })) => { + response_notifiers.insert( + request_id, + response_notifier, + // Since request will be cancelled when timeout occurs, we are free to drop its state. + ETH_RPC_REQUEST_TIMEOUT, + ); + + let mut should_continue = Default::default(); + for _ in 0..MAX_ATTEMPTS { + match wsocket + .send(tokio_tungstenite_wasm::Message::Text(serialized_request.clone())) + .await + { + Ok(_) => { + should_continue = false; + break; + }, + Err(e) => { + log::error!("{e}"); + should_continue = true; + }, + } + + Timer::sleep(SLEEP_DURATION).await; + } + + if should_continue { + let _ = response_notifiers.remove(&request_id); + return OuterAction::Continue; + } + }, + Some(ControllerMessage::Close) => { + return OuterAction::Break; + }, + _ => {}, + } + + OuterAction::None + } + + async fn handle_response( + &self, + message: Option>, + response_notifiers: &mut ExpirableMap>>, + ) -> OuterAction { + match message { + Some(Ok(tokio_tungstenite_wasm::Message::Text(inc_event))) => { + if let Ok(inc_event) = serde_json::from_str::(&inc_event) { + if !inc_event.is_object() { + return OuterAction::Continue; + } + + if let Some(id) = inc_event.get("id") { + // just to ensure we don't have outdated entries + response_notifiers.clear_expired_entries(); + + let request_id = id.as_u64().unwrap_or_default() as usize; + + if let Some(notifier) = response_notifiers.remove(&request_id) { + let mut res_bytes: Vec = Vec::new(); + if serde_json::to_writer(&mut res_bytes, &inc_event).is_ok() { + notifier.send(res_bytes).expect("receiver channel must be alive"); + } + } + } + } + }, + Some(Ok(tokio_tungstenite_wasm::Message::Binary(_))) => return OuterAction::Continue, + Some(Ok(tokio_tungstenite_wasm::Message::Close(_))) => return OuterAction::Break, + Some(Err(e)) => { + log::error!("{e}"); + return OuterAction::Return; + }, + None => return OuterAction::Continue, + }; + + OuterAction::None + } + + async fn attempt_to_establish_socket_connection( + &self, + max_attempts: u32, + mut sleep_duration_on_failure: f64, + ) -> tokio_tungstenite_wasm::Result { + const MAX_SLEEP_DURATION: f64 = 32.0; + let mut attempts = 0; + + loop { + match tokio_tungstenite_wasm::connect(self.node.uri.to_string()).await { + Ok(ws) => return Ok(ws), + Err(e) => { + attempts += 1; + if attempts > max_attempts { + return Err(e); + } + + Timer::sleep(sleep_duration_on_failure).await; + sleep_duration_on_failure = (sleep_duration_on_failure * 2.0).min(MAX_SLEEP_DURATION); + }, + }; + } + } + + pub(crate) async fn start_connection_loop(self, expires_at: Option) { + let _guard = self.connection_guard.lock().await; + + // List of awaiting requests + let mut response_notifiers: ExpirableMap>> = ExpirableMap::default(); + + let mut wsocket = match self + .attempt_to_establish_socket_connection(MAX_ATTEMPTS, SLEEP_DURATION) + .await + { + Ok(ws) => ws, + Err(e) => { + log::error!("Connection could not established for {}. Error {e}", self.node.uri); + return; + }, + }; + + let mut keepalive_interval = Ticker::new(KEEPALIVE_DURATION); + let mut req_rx = self.controller_channel.rx.lock().await; + + loop { + futures_util::select! { + _ = keepalive_interval.next().fuse() => { + match self.handle_keepalive(&mut wsocket, &mut response_notifiers, expires_at).await { + OuterAction::None => {}, + OuterAction::Continue => continue, + OuterAction::Break => break, + OuterAction::Return => return, + } + } + + request = req_rx.next().fuse() => { + match self.handle_send_request(request, &mut wsocket, &mut response_notifiers).await { + OuterAction::None => {}, + OuterAction::Continue => continue, + OuterAction::Break => break, + OuterAction::Return => return, + } + } + + message = wsocket.next().fuse() => { + match self.handle_response(message, &mut response_notifiers).await { + OuterAction::None => {}, + OuterAction::Continue => continue, + OuterAction::Break => break, + OuterAction::Return => return, + } + } + } + } + } + + pub(crate) async fn stop_connection_loop(&self) { + let mut tx = self.controller_channel.tx.lock().await; + tx.send(ControllerMessage::Close) + .await + .expect("receiver channel must be alive"); + } + + pub(crate) fn maybe_spawn_connection_loop(&self, coin: EthCoin) { + self.maybe_spawn_connection_loop_inner(coin, None) + } + + pub(crate) fn maybe_spawn_temporary_connection_loop(&self, coin: EthCoin, expires_at: Instant) { + self.maybe_spawn_connection_loop_inner(coin, Some(expires_at)) + } + + fn maybe_spawn_connection_loop_inner(&self, coin: EthCoin, expires_at: Option) { + // if we can acquire the lock here, it means connection loop is not alive + if self.connection_guard.try_lock().is_some() { + let fut = self.clone().start_connection_loop(expires_at); + let settings = AbortSettings::info_on_abort(format!("connection loop stopped for {:?}", self.node.uri)); + coin.spawner().spawn_with_settings(fut, settings); + } + } +} + +async fn send_request( + transport: WebsocketTransport, + request: Call, + request_id: RequestId, + event_handlers: Vec, +) -> Result { + let mut serialized_request = to_string(&request); + + if transport.node.gui_auth { + match handle_gui_auth_payload(&transport.gui_auth_validation_generator, &request) { + Ok(r) => serialized_request = r, + Err(e) => { + return Err(Error::Transport(TransportError::Message(format!( + "Couldn't generate signed message payload for {:?}. Error: {e}", + request + )))); + }, + }; + } + + let mut tx = transport.controller_channel.tx.lock().await; + + let (notification_sender, notification_receiver) = futures::channel::oneshot::channel::>(); + + event_handlers.on_outgoing_request(serialized_request.as_bytes()); + + tx.send(ControllerMessage::Request(WsRequest { + request_id, + serialized_request, + response_notifier: notification_sender, + })) + .await + .map_err(|e| Error::Transport(TransportError::Message(e.to_string())))?; + + if let Ok(response) = notification_receiver.await { + event_handlers.on_incoming_response(&response); + return de_rpc_response(response, &transport.node.uri.to_string()); + }; + + Err(Error::Transport(TransportError::Message(format!( + "Sending {:?} failed.", + request + )))) +} + +impl Transport for WebsocketTransport { + type Out = Web3SendOut; + + fn prepare(&self, method: &str, params: Vec) -> (RequestId, Call) { + let request_id = self.request_id.fetch_add(1, Ordering::SeqCst); + let request = build_request(request_id, method, params); + + (request_id, request) + } + + fn send(&self, id: RequestId, request: Call) -> Self::Out { + Box::pin(send_request(self.clone(), request, id, self.event_handlers.clone())) + } +} diff --git a/mm2src/coins/nft.rs b/mm2src/coins/nft.rs index 613603ebd2..0584d9fa13 100644 --- a/mm2src/coins/nft.rs +++ b/mm2src/coins/nft.rs @@ -729,7 +729,7 @@ async fn get_moralis_nft_transfers( async fn get_fee_details(eth_coin: &EthCoin, transaction_hash: &str) -> Option { let hash = H256::from_str(transaction_hash).ok()?; - let receipt = eth_coin.web3.eth().transaction_receipt(hash).await.ok()?; + let receipt = eth_coin.web3().await.ok()?.eth().transaction_receipt(hash).await.ok()?; let fee_coin = match eth_coin.coin_type { EthCoinType::Eth => eth_coin.ticker(), EthCoinType::Erc20 { .. } => return None, @@ -742,7 +742,9 @@ async fn get_fee_details(eth_coin: &EthCoin, transaction_hash: &str) -> Option EthTxFeeDetails::new(gas_used, gas_price, fee_coin).ok(), None => { let web3_tx = eth_coin - .web3 + .web3() + .await + .ok()? .eth() .transaction(TransactionId::Hash(hash)) .await diff --git a/mm2src/coins_activation/src/erc20_token_activation.rs b/mm2src/coins_activation/src/erc20_token_activation.rs index 9a5dff22d9..851d188267 100644 --- a/mm2src/coins_activation/src/erc20_token_activation.rs +++ b/mm2src/coins_activation/src/erc20_token_activation.rs @@ -21,7 +21,8 @@ impl From for EnableTokenError { fn from(err: Erc20TokenActivationError) -> Self { match err { Erc20TokenActivationError::InternalError(e) => EnableTokenError::Internal(e), - Erc20TokenActivationError::CouldNotFetchBalance(e) => EnableTokenError::Transport(e), + Erc20TokenActivationError::CouldNotFetchBalance(e) + | Erc20TokenActivationError::ClientConnectionFailed(e) => EnableTokenError::Transport(e), } } } diff --git a/mm2src/coins_activation/src/eth_with_token_activation.rs b/mm2src/coins_activation/src/eth_with_token_activation.rs index 7d1b695f55..abbc17caa0 100644 --- a/mm2src/coins_activation/src/eth_with_token_activation.rs +++ b/mm2src/coins_activation/src/eth_with_token_activation.rs @@ -80,7 +80,8 @@ impl From for InitTokensAsMmCoinsError { fn from(error: Erc20TokenActivationError) -> Self { match error { Erc20TokenActivationError::InternalError(e) => InitTokensAsMmCoinsError::Internal(e), - Erc20TokenActivationError::CouldNotFetchBalance(e) => InitTokensAsMmCoinsError::CouldNotFetchBalance(e), + Erc20TokenActivationError::CouldNotFetchBalance(e) + | Erc20TokenActivationError::ClientConnectionFailed(e) => InitTokensAsMmCoinsError::CouldNotFetchBalance(e), } } } diff --git a/mm2src/common/Cargo.toml b/mm2src/common/Cargo.toml index 47608b2b02..4637818b35 100644 --- a/mm2src/common/Cargo.toml +++ b/mm2src/common/Cargo.toml @@ -35,6 +35,7 @@ parking_lot = { version = "0.12.0", features = ["nightly"] } parking_lot_core = { version = "0.6", features = ["nightly"] } primitive-types = "0.11.1" rand = { version = "0.7", features = ["std", "small_rng"] } +rustc-hash = "1.1.0" regex = "1" serde = "1" serde_derive = "1" diff --git a/mm2src/common/common.rs b/mm2src/common/common.rs index 79132a6d39..6a89bfeff7 100644 --- a/mm2src/common/common.rs +++ b/mm2src/common/common.rs @@ -128,6 +128,7 @@ pub mod crash_reports; pub mod custom_futures; pub mod custom_iter; #[path = "executor/mod.rs"] pub mod executor; +pub mod expirable_map; pub mod number_type_casting; pub mod password_policy; pub mod seri; diff --git a/mm2src/common/expirable_map.rs b/mm2src/common/expirable_map.rs new file mode 100644 index 0000000000..73ebc1f580 --- /dev/null +++ b/mm2src/common/expirable_map.rs @@ -0,0 +1,107 @@ +//! This module provides a cross-compatible map that associates values with keys and supports expiring entries. +//! +//! Designed for performance-oriented use-cases utilizing `FxHashMap` under the hood, +//! and is not suitable for cryptographic purposes. + +use instant::{Duration, Instant}; +use rustc_hash::FxHashMap; +use std::hash::Hash; + +#[derive(Clone, Debug)] +pub struct ExpirableEntry { + pub(crate) value: V, + pub(crate) expires_at: Instant, +} + +impl ExpirableEntry { + pub fn get_element(&self) -> &V { &self.value } + + pub fn update_expiration(&mut self, expires_at: Instant) { self.expires_at = expires_at } +} + +impl Default for ExpirableMap { + fn default() -> Self { Self::new() } +} + +/// A map that allows associating values with keys and expiring entries. +/// It is important to note that this implementation does not automatically +/// remove any entries; it is the caller's responsibility to invoke `clear_expired_entries` +/// at specified intervals. +/// +/// WARNING: This is designed for performance-oriented use-cases utilizing `FxHashMap` +/// under the hood and is not suitable for cryptographic purposes. +#[derive(Clone, Debug)] +pub struct ExpirableMap(FxHashMap>); + +impl ExpirableMap { + /// Creates a new empty `ExpirableMap` + #[inline] + pub fn new() -> Self { Self(FxHashMap::default()) } + + /// Inserts a key-value pair with an expiration duration. + /// + /// If a value already exists for the given key, it will be updated and then + /// the old one will be returned. + pub fn insert(&mut self, k: K, v: V, exp: Duration) -> Option { + let entry = ExpirableEntry { + expires_at: Instant::now() + exp, + value: v, + }; + + self.0.insert(k, entry).map(|v| v.value) + } + + /// Removes expired entries from the map. + pub fn clear_expired_entries(&mut self) { self.0.retain(|_k, v| Instant::now() < v.expires_at); } + + // Removes a key-value pair from the map and returns the associated value if present. + #[inline] + pub fn remove(&mut self, k: &K) -> Option { self.0.remove(k).map(|v| v.value) } +} + +#[cfg(any(test, target_arch = "wasm32"))] +mod tests { + use super::*; + use crate::cross_test; + use crate::executor::Timer; + + crate::cfg_wasm32! { + use wasm_bindgen_test::*; + wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); + } + + cross_test!(test_clear_expired_entries, { + let mut expirable_map = ExpirableMap::new(); + let value = "test_value"; + let exp = Duration::from_secs(1); + + // Insert 2 entries with 1 sec expiration time + expirable_map.insert("key1".to_string(), value.to_string(), exp); + expirable_map.insert("key2".to_string(), value.to_string(), exp); + + // Wait for entries to expire + Timer::sleep(2.).await; + + // Clear expired entries + expirable_map.clear_expired_entries(); + + // We waited for 2 seconds, so we shouldn't have any entry accessible + assert_eq!(expirable_map.0.len(), 0); + + // Insert 5 entries + expirable_map.insert("key1".to_string(), value.to_string(), Duration::from_secs(5)); + expirable_map.insert("key2".to_string(), value.to_string(), Duration::from_secs(4)); + expirable_map.insert("key3".to_string(), value.to_string(), Duration::from_secs(7)); + expirable_map.insert("key4".to_string(), value.to_string(), Duration::from_secs(2)); + expirable_map.insert("key5".to_string(), value.to_string(), Duration::from_millis(3750)); + + // Wait 2 seconds to expire some entries + Timer::sleep(2.).await; + + // Clear expired entries + expirable_map.clear_expired_entries(); + + // We waited for 2 seconds, only one entry should expire + assert_eq!(expirable_map.0.len(), 4); + }); +} diff --git a/mm2src/common/time_cache.rs b/mm2src/common/time_cache.rs index aafa8a6aea..a1c3987ec2 100644 --- a/mm2src/common/time_cache.rs +++ b/mm2src/common/time_cache.rs @@ -28,81 +28,69 @@ use std::collections::hash_map::{self, use std::collections::VecDeque; use std::time::Duration; -#[derive(Debug)] -pub struct ExpiringElement { - /// The element that expires - element: Element, - /// The expire time. - expires: Instant, -} - -impl ExpiringElement { - pub fn get_element(&self) -> &Element { &self.element } - - pub fn update_expiration(&mut self, expires: Instant) { self.expires = expires } -} +use crate::expirable_map::ExpirableEntry; #[derive(Debug)] pub struct TimeCache { /// Mapping a key to its value together with its latest expire time (can be updated through /// reinserts). - map: FnvHashMap>, + map: FnvHashMap>, /// An ordered list of keys by expires time. - list: VecDeque>, + list: VecDeque>, /// The time elements remain in the cache. ttl: Duration, } pub struct OccupiedEntry<'a, K, V> { expiration: Instant, - entry: hash_map::OccupiedEntry<'a, K, ExpiringElement>, - list: &'a mut VecDeque>, + entry: hash_map::OccupiedEntry<'a, K, ExpirableEntry>, + list: &'a mut VecDeque>, } impl<'a, K, V> OccupiedEntry<'a, K, V> where K: Eq + std::hash::Hash + Clone, { - pub fn into_mut(self) -> &'a mut V { &mut self.entry.into_mut().element } + pub fn into_mut(self) -> &'a mut V { &mut self.entry.into_mut().value } #[allow(dead_code)] pub fn insert_without_updating_expiration(&mut self, value: V) -> V { //keep old expiration, only replace value of element - ::std::mem::replace(&mut self.entry.get_mut().element, value) + ::std::mem::replace(&mut self.entry.get_mut().value, value) } #[allow(dead_code)] pub fn insert_and_update_expiration(&mut self, value: V) -> V { //We push back an additional element, the first reference in the list will be ignored // since we also updated the expires in the map, see below. - self.list.push_back(ExpiringElement { - element: self.entry.key().clone(), - expires: self.expiration, + self.list.push_back(ExpirableEntry { + value: self.entry.key().clone(), + expires_at: self.expiration, }); self.entry - .insert(ExpiringElement { - element: value, - expires: self.expiration, + .insert(ExpirableEntry { + value, + expires_at: self.expiration, }) - .element + .value } pub fn into_mut_with_update_expiration(mut self) -> &'a mut V { //We push back an additional element, the first reference in the list will be ignored // since we also updated the expires in the map, see below. - self.list.push_back(ExpiringElement { - element: self.entry.key().clone(), - expires: self.expiration, + self.list.push_back(ExpirableEntry { + value: self.entry.key().clone(), + expires_at: self.expiration, }); self.entry.get_mut().update_expiration(self.expiration); - &mut self.entry.into_mut().element + &mut self.entry.into_mut().value } } pub struct VacantEntry<'a, K, V> { expiration: Instant, - entry: hash_map::VacantEntry<'a, K, ExpiringElement>, - list: &'a mut VecDeque>, + entry: hash_map::VacantEntry<'a, K, ExpirableEntry>, + list: &'a mut VecDeque>, } impl<'a, K, V> VacantEntry<'a, K, V> @@ -110,17 +98,17 @@ where K: Eq + std::hash::Hash + Clone, { pub fn insert(self, value: V) -> &'a mut V { - self.list.push_back(ExpiringElement { - element: self.entry.key().clone(), - expires: self.expiration, + self.list.push_back(ExpirableEntry { + value: self.entry.key().clone(), + expires_at: self.expiration, }); &mut self .entry - .insert(ExpiringElement { - element: value, - expires: self.expiration, + .insert(ExpirableEntry { + value, + expires_at: self.expiration, }) - .element + .value } } @@ -163,12 +151,12 @@ where fn remove_expired_keys(&mut self, now: Instant) { while let Some(element) = self.list.pop_front() { - if element.expires > now { + if element.expires_at > now { self.list.push_front(element); break; } - if let Occupied(entry) = self.map.entry(element.element.clone()) { - if entry.get().expires <= now { + if let Occupied(entry) = self.map.entry(element.value.clone()) { + if entry.get().expires_at <= now { entry.remove(); } } @@ -207,7 +195,7 @@ where // Removes a certain key even if it didn't expire plus removing other expired keys pub fn remove(&mut self, key: Key) -> Option { - let result = self.map.remove(&key).map(|el| el.element); + let result = self.map.remove(&key).map(|el| el.value); self.remove_expired_keys(Instant::now()); result } @@ -221,7 +209,7 @@ where pub fn contains_key(&self, key: &Key) -> bool { self.map.contains_key(key) } - pub fn get(&self, key: &Key) -> Option<&Value> { self.map.get(key).map(|e| &e.element) } + pub fn get(&self, key: &Key) -> Option<&Value> { self.map.get(key).map(|e| &e.value) } pub fn len(&self) -> usize { self.map.len() } @@ -229,9 +217,9 @@ where pub fn ttl(&self) -> Duration { self.ttl } - pub fn iter(&self) -> Iter> { self.map.iter() } + pub fn iter(&self) -> Iter> { self.map.iter() } - pub fn keys(&self) -> Keys> { self.map.keys() } + pub fn keys(&self) -> Keys> { self.map.keys() } } impl TimeCache @@ -242,7 +230,7 @@ where pub fn as_hash_map(&self) -> std::collections::HashMap { self.map .iter() - .map(|(key, expiring_el)| (key.clone(), expiring_el.element.clone())) + .map(|(key, expiring_el)| (key.clone(), expiring_el.value.clone())) .collect() } } diff --git a/mm2src/mm2_main/src/heartbeat_event.rs b/mm2src/mm2_main/src/heartbeat_event.rs new file mode 100644 index 0000000000..9b4cb0809a --- /dev/null +++ b/mm2src/mm2_main/src/heartbeat_event.rs @@ -0,0 +1,52 @@ +use async_trait::async_trait; +use common::{executor::{SpawnFuture, Timer}, + log::info}; +use futures::channel::oneshot::{self, Receiver, Sender}; +use mm2_core::mm_ctx::MmArc; +use mm2_event_stream::{behaviour::{EventBehaviour, EventInitStatus}, + Event, EventStreamConfiguration}; + +pub struct HeartbeatEvent { + ctx: MmArc, +} + +impl HeartbeatEvent { + pub fn new(ctx: MmArc) -> Self { Self { ctx } } +} + +#[async_trait] +impl EventBehaviour for HeartbeatEvent { + const EVENT_NAME: &'static str = "HEARTBEAT"; + + async fn handle(self, interval: f64, tx: oneshot::Sender) { + tx.send(EventInitStatus::Success).unwrap(); + + loop { + self.ctx + .stream_channel_controller + .broadcast(Event::new(Self::EVENT_NAME.to_string(), json!({}).to_string())) + .await; + + Timer::sleep(interval).await; + } + } + + async fn spawn_if_active(self, config: &EventStreamConfiguration) -> EventInitStatus { + if let Some(event) = config.get_event(Self::EVENT_NAME) { + info!( + "{} event is activated with {} seconds interval.", + Self::EVENT_NAME, + event.stream_interval_seconds + ); + + let (tx, rx): (Sender, Receiver) = oneshot::channel(); + self.ctx.spawner().spawn(self.handle(event.stream_interval_seconds, tx)); + + rx.await.unwrap_or_else(|e| { + EventInitStatus::Failed(format!("Event initialization status must be received: {}", e)) + }) + } else { + EventInitStatus::Inactive + } + } +} diff --git a/mm2src/mm2_main/src/lp_native_dex.rs b/mm2src/mm2_main/src/lp_native_dex.rs index b5e9ef131a..50c9f13c83 100644 --- a/mm2src/mm2_main/src/lp_native_dex.rs +++ b/mm2src/mm2_main/src/lp_native_dex.rs @@ -46,6 +46,7 @@ use std::{fs, usize}; #[cfg(not(target_arch = "wasm32"))] use crate::mm2::database::init_and_migrate_sql_db; +use crate::mm2::heartbeat_event::HeartbeatEvent; use crate::mm2::lp_message_service::{init_message_service, InitMessageServiceError}; use crate::mm2::lp_network::{lp_network_ports, p2p_event_process_loop, NetIdError}; use crate::mm2::lp_ordermatch::{broadcast_maker_orders_keep_alive_loop, clean_memory_loop, init_ordermatch_context, @@ -206,6 +207,8 @@ pub enum MmInitError { InvalidPassphrase(String), #[display(fmt = "NETWORK event initialization failed: {}", _0)] NetworkEventInitFailed(String), + #[display(fmt = "HEARTBEAT event initialization failed: {}", _0)] + HeartbeatEventInitFailed(String), #[from_trait(WithHwRpcError::hw_rpc_error)] #[display(fmt = "{}", _0)] HwError(HwRpcError), @@ -436,6 +439,10 @@ async fn init_event_streaming(ctx: &MmArc) -> MmInitResult<()> { if let EventInitStatus::Failed(err) = NetworkEvent::new(ctx.clone()).spawn_if_active(config).await { return MmError::err(MmInitError::NetworkEventInitFailed(err)); } + + if let EventInitStatus::Failed(err) = HeartbeatEvent::new(ctx.clone()).spawn_if_active(config).await { + return MmError::err(MmInitError::HeartbeatEventInitFailed(err)); + } } Ok(()) diff --git a/mm2src/mm2_main/src/mm2.rs b/mm2src/mm2_main/src/mm2.rs index a6310ba180..49c713ae43 100644 --- a/mm2src/mm2_main/src/mm2.rs +++ b/mm2src/mm2_main/src/mm2.rs @@ -58,6 +58,7 @@ use mm2_err_handle::prelude::*; #[path = "database.rs"] pub mod database; +#[path = "heartbeat_event.rs"] pub mod heartbeat_event; #[path = "lp_dispatcher.rs"] pub mod lp_dispatcher; #[path = "lp_message_service.rs"] pub mod lp_message_service; #[path = "lp_network.rs"] pub mod lp_network; diff --git a/mm2src/mm2_net/src/transport.rs b/mm2src/mm2_net/src/transport.rs index 27c039d556..1dc4d032c3 100644 --- a/mm2src/mm2_net/src/transport.rs +++ b/mm2src/mm2_net/src/transport.rs @@ -77,7 +77,7 @@ pub struct GuiAuthValidationGenerator { } /// gui-auth specific data-type that needed in order to perform gui-auth calls -#[derive(Serialize, Clone)] +#[derive(Clone, Serialize)] pub struct GuiAuthValidation { pub coin_ticker: String, pub address: String,