From 954b2f58f15fb3a2683acd209eea02d044b85268 Mon Sep 17 00:00:00 2001 From: shamardy Date: Wed, 6 Nov 2024 19:41:44 +0200 Subject: [PATCH 1/2] make `wait_for_htlc_tx_spend` async --- mm2src/coins/eth.rs | 128 +++++++++--------- mm2src/coins/eth/eth_tests.rs | 2 +- mm2src/coins/lightning.rs | 83 ++++++------ mm2src/coins/lightning/ln_platform.rs | 1 - mm2src/coins/lp_coins.rs | 8 +- mm2src/coins/qrc20.rs | 22 +-- mm2src/coins/qrc20/qrc20_tests.rs | 2 +- mm2src/coins/siacoin.rs | 2 +- mm2src/coins/tendermint/tendermint_coin.rs | 89 ++++++------ mm2src/coins/tendermint/tendermint_token.rs | 22 +-- mm2src/coins/test_coin.rs | 2 +- mm2src/coins/utxo/bch.rs | 3 +- mm2src/coins/utxo/qtum.rs | 3 +- mm2src/coins/utxo/slp.rs | 3 +- mm2src/coins/utxo/utxo_common.rs | 17 +-- mm2src/coins/utxo/utxo_standard.rs | 3 +- mm2src/coins/utxo/utxo_tests.rs | 4 +- mm2src/coins/z_coin.rs | 3 +- mm2src/mm2_main/src/lp_swap/swap_watcher.rs | 2 +- mm2src/mm2_main/src/lp_swap/taker_swap.rs | 22 +-- .../tests/docker_tests/qrc20_tests.rs | 4 +- 21 files changed, 202 insertions(+), 223 deletions(-) diff --git a/mm2src/coins/eth.rs b/mm2src/coins/eth.rs index 170091a453..71b0fbe0c9 100644 --- a/mm2src/coins/eth.rs +++ b/mm2src/coins/eth.rs @@ -2349,18 +2349,18 @@ impl MarketCoinOps for EthCoin { Box::new(fut.boxed().compat()) } - fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionFut { - let unverified: UnverifiedTransactionWrapper = try_tx_fus!(rlp::decode(args.tx_bytes)); - let tx = try_tx_fus!(SignedEthTx::new(unverified)); + async fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionResult { + let unverified: UnverifiedTransactionWrapper = try_tx_s!(rlp::decode(args.tx_bytes)); + let tx = try_tx_s!(SignedEthTx::new(unverified)); let swap_contract_address = match args.swap_contract_address { - Some(addr) => try_tx_fus!(addr.try_to_address()), + Some(addr) => try_tx_s!(addr.try_to_address()), None => match tx.unsigned().action() { Call(address) => *address, Create => { - return Box::new(futures01::future::err(TransactionErr::Plain(ERRL!( + return Err(TransactionErr::Plain(ERRL!( "Invalid payment action: the payment action cannot be create" - )))) + ))) }, }, }; @@ -2369,85 +2369,79 @@ impl MarketCoinOps for EthCoin { EthCoinType::Eth => get_function_name("ethPayment", args.watcher_reward), EthCoinType::Erc20 { .. } => get_function_name("erc20Payment", args.watcher_reward), EthCoinType::Nft { .. } => { - return Box::new(futures01::future::err(TransactionErr::ProtocolNotSupported(ERRL!( + return Err(TransactionErr::ProtocolNotSupported(ERRL!( "Nft Protocol is not supported yet!" - )))) + ))) }, }; - let payment_func = try_tx_fus!(SWAP_CONTRACT.function(&func_name)); - let decoded = try_tx_fus!(decode_contract_call(payment_func, tx.unsigned().data())); + let payment_func = try_tx_s!(SWAP_CONTRACT.function(&func_name)); + let decoded = try_tx_s!(decode_contract_call(payment_func, tx.unsigned().data())); let id = match decoded.first() { Some(Token::FixedBytes(bytes)) => bytes.clone(), invalid_token => { - return Box::new(futures01::future::err(TransactionErr::Plain(ERRL!( + return Err(TransactionErr::Plain(ERRL!( "Expected Token::FixedBytes, got {:?}", invalid_token - )))) + ))) }, }; - let selfi = self.clone(); - let from_block = args.from_block; - let wait_until = args.wait_until; - let check_every = args.check_every; - let fut = async move { - loop { - if now_sec() > wait_until { - return TX_PLAIN_ERR!( - "Waited too long until {} for transaction {:?} to be spent ", - wait_until, - tx, - ); - } - let current_block = match selfi.current_block().compat().await { - Ok(b) => b, - Err(e) => { - error!("Error getting block number: {}", e); - Timer::sleep(5.).await; - continue; - }, - }; + loop { + if now_sec() > args.wait_until { + return TX_PLAIN_ERR!( + "Waited too long until {} for transaction {:?} to be spent ", + args.wait_until, + tx, + ); + } - let events = match selfi - .spend_events(swap_contract_address, from_block, current_block) - .compat() - .await - { - Ok(ev) => ev, - Err(e) => { - error!("Error getting spend events: {}", e); - Timer::sleep(5.).await; - continue; - }, - }; + let current_block = match self.current_block().compat().await { + Ok(b) => b, + Err(e) => { + error!("Error getting block number: {}", e); + Timer::sleep(5.).await; + continue; + }, + }; - let found = events.iter().find(|event| &event.data.0[..32] == id.as_slice()); + let events = match self + .spend_events(swap_contract_address, args.from_block, current_block) + .compat() + .await + { + Ok(ev) => ev, + Err(e) => { + error!("Error getting spend events: {}", e); + Timer::sleep(5.).await; + continue; + }, + }; - if let Some(event) = found { - if let Some(tx_hash) = event.transaction_hash { - let transaction = match selfi.transaction(TransactionId::Hash(tx_hash)).await { - Ok(Some(t)) => t, - Ok(None) => { - info!("Tx {} not found yet", tx_hash); - Timer::sleep(check_every).await; - continue; - }, - Err(e) => { - error!("Get tx {} error: {}", tx_hash, e); - Timer::sleep(check_every).await; - continue; - }, - }; + let found = events.iter().find(|event| &event.data.0[..32] == id.as_slice()); - return Ok(TransactionEnum::from(try_tx_s!(signed_tx_from_web3_tx(transaction)))); - } - } + if let Some(event) = found { + if let Some(tx_hash) = event.transaction_hash { + let transaction = match self.transaction(TransactionId::Hash(tx_hash)).await { + Ok(Some(t)) => t, + Ok(None) => { + info!("Tx {} not found yet", tx_hash); + Timer::sleep(args.check_every).await; + continue; + }, + Err(e) => { + error!("Get tx {} error: {}", tx_hash, e); + Timer::sleep(args.check_every).await; + continue; + }, + }; - Timer::sleep(5.).await; + return Ok(TransactionEnum::from(try_tx_s!(signed_tx_from_web3_tx(transaction)))); + } } - }; - Box::new(fut.boxed().compat()) + + Timer::sleep(5.).await; + } } fn tx_enum_from_bytes(&self, bytes: &[u8]) -> Result> { diff --git a/mm2src/coins/eth/eth_tests.rs b/mm2src/coins/eth/eth_tests.rs index c7f1e51d13..19f45d10ae 100644 --- a/mm2src/coins/eth/eth_tests.rs +++ b/mm2src/coins/eth/eth_tests.rs @@ -191,7 +191,7 @@ fn test_wait_for_payment_spend_timeout() { 184, 42, 106, ]; - assert!(block_on_f01(coin.wait_for_htlc_tx_spend(WaitForHTLCTxSpendArgs { + assert!(block_on(coin.wait_for_htlc_tx_spend(WaitForHTLCTxSpendArgs { tx_bytes: &tx_bytes, secret_hash: &[], wait_until, diff --git a/mm2src/coins/lightning.rs b/mm2src/coins/lightning.rs index ce97c96cb3..67b27ba8f4 100644 --- a/mm2src/coins/lightning.rs +++ b/mm2src/coins/lightning.rs @@ -1164,58 +1164,53 @@ impl MarketCoinOps for LightningCoin { Box::new(fut.boxed().compat()) } - fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionFut { - let payment_hash = try_tx_fus!(payment_hash_from_slice(args.tx_bytes)); + async fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionResult { + let payment_hash = try_tx_s!(payment_hash_from_slice(args.tx_bytes)); let payment_hex = hex::encode(payment_hash.0); - let coin = self.clone(); - let wait_until = args.wait_until; - let fut = async move { - loop { - if now_sec() > wait_until { - return Err(TransactionErr::Plain(ERRL!( - "Waited too long until {} for payment {} to be spent", - wait_until, - payment_hex - ))); - } + loop { + if now_sec() > args.wait_until { + return Err(TransactionErr::Plain(ERRL!( + "Waited too long until {} for payment {} to be spent", + args.wait_until, + payment_hex + ))); + } - match coin.db.get_payment_from_db(payment_hash).await { - Ok(Some(payment)) => match payment.status { - HTLCStatus::Pending => (), - HTLCStatus::Claimable => { - return Err(TransactionErr::Plain(ERRL!( - "Payment {} has an invalid status of {} in the db", - payment_hex, - payment.status - ))) - }, - HTLCStatus::Succeeded => return Ok(TransactionEnum::LightningPayment(payment_hash)), - HTLCStatus::Failed => { - return Err(TransactionErr::Plain(ERRL!( - "Lightning swap payment {} failed", - payment_hex - ))) - }, - }, - Ok(None) => return Err(TransactionErr::Plain(ERRL!("Payment {} not found in DB", payment_hex))), - Err(e) => { + match self.db.get_payment_from_db(payment_hash).await { + Ok(Some(payment)) => match payment.status { + HTLCStatus::Pending => (), + HTLCStatus::Claimable => { return Err(TransactionErr::Plain(ERRL!( - "Error getting payment {} from db: {}", + "Payment {} has an invalid status of {} in the db", payment_hex, - e + payment.status ))) }, - } - - // note: When sleeping for only 1 second the test_send_payment_and_swaps unit test took 20 seconds to complete instead of 37 seconds when sleeping for 10 seconds - // Todo: In next sprints, should add a mutex for lightning swap payments to avoid overloading the shared db connection with requests when the sleep time is reduced and multiple swaps are ran together. - // Todo: The aim is to make lightning swap payments as fast as possible, more sleep time can be allowed for maker payment since it waits for the secret to be revealed on another chain first. - // Todo: Running swap payments statuses should be loaded from db on restarts in this case. - Timer::sleep(10.).await; + HTLCStatus::Succeeded => return Ok(TransactionEnum::LightningPayment(payment_hash)), + HTLCStatus::Failed => { + return Err(TransactionErr::Plain(ERRL!( + "Lightning swap payment {} failed", + payment_hex + ))) + }, + }, + Ok(None) => return Err(TransactionErr::Plain(ERRL!("Payment {} not found in DB", payment_hex))), + Err(e) => { + return Err(TransactionErr::Plain(ERRL!( + "Error getting payment {} from db: {}", + payment_hex, + e + ))) + }, } - }; - Box::new(fut.boxed().compat()) + + // note: When sleeping for only 1 second the test_send_payment_and_swaps unit test took 20 seconds to complete instead of 37 seconds when sleeping for 10 seconds + // Todo: In next sprints, should add a mutex for lightning swap payments to avoid overloading the shared db connection with requests when the sleep time is reduced and multiple swaps are ran together. + // Todo: The aim is to make lightning swap payments as fast as possible, more sleep time can be allowed for maker payment since it waits for the secret to be revealed on another chain first. + // Todo: Running swap payments statuses should be loaded from db on restarts in this case. + Timer::sleep(10.).await; + } } fn tx_enum_from_bytes(&self, bytes: &[u8]) -> Result> { diff --git a/mm2src/coins/lightning/ln_platform.rs b/mm2src/coins/lightning/ln_platform.rs index 59e3e19488..a3a4c22776 100644 --- a/mm2src/coins/lightning/ln_platform.rs +++ b/mm2src/coins/lightning/ln_platform.rs @@ -542,7 +542,6 @@ impl Platform { check_every: TAKER_PAYMENT_SPEND_SEARCH_INTERVAL, watcher_reward: false, }) - .compat() .await .map_to_mm(|e| SaveChannelClosingError::WaitForFundingTxSpendError(e.get_plain_text_format()))?; diff --git a/mm2src/coins/lp_coins.rs b/mm2src/coins/lp_coins.rs index cce84e99af..719156834b 100644 --- a/mm2src/coins/lp_coins.rs +++ b/mm2src/coins/lp_coins.rs @@ -2033,7 +2033,13 @@ pub trait MarketCoinOps { fn wait_for_confirmations(&self, input: ConfirmPaymentInput) -> Box + Send>; - fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionFut; + /// Waits for spending/unlocking of funds locked in a HTLC construction specific to the coin's + /// chain. Implementation should monitor locked funds (UTXO/contract/etc.) until funds are + /// spent/unlocked or timeout is reached. + /// + /// Returns spending tx/event from mempool/pending state to allow prompt extraction of preimage + /// secret. + async fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionResult; fn tx_enum_from_bytes(&self, bytes: &[u8]) -> Result>; diff --git a/mm2src/coins/qrc20.rs b/mm2src/coins/qrc20.rs index 3f86b78539..df14fea09a 100644 --- a/mm2src/coins/qrc20.rs +++ b/mm2src/coins/qrc20.rs @@ -1254,23 +1254,11 @@ impl MarketCoinOps for Qrc20Coin { Box::new(fut.boxed().compat()) } - fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionFut { - let tx: UtxoTx = try_tx_fus!(deserialize(args.tx_bytes).map_err(|e| ERRL!("{:?}", e))); - - let selfi = self.clone(); - let WaitForHTLCTxSpendArgs { - check_every, - from_block, - wait_until, - .. - } = args; - let fut = async move { - selfi - .wait_for_tx_spend_impl(tx, wait_until, from_block, check_every) - .map_err(TransactionErr::Plain) - .await - }; - Box::new(fut.boxed().compat()) + async fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionResult { + let tx: UtxoTx = try_tx_s!(deserialize(args.tx_bytes).map_err(|e| ERRL!("{:?}", e))); + self.wait_for_tx_spend_impl(tx, args.wait_until, args.from_block, args.check_every) + .map_err(TransactionErr::Plain) + .await } fn tx_enum_from_bytes(&self, bytes: &[u8]) -> Result> { diff --git a/mm2src/coins/qrc20/qrc20_tests.rs b/mm2src/coins/qrc20/qrc20_tests.rs index 930f078c53..3e6dbc94dd 100644 --- a/mm2src/coins/qrc20/qrc20_tests.rs +++ b/mm2src/coins/qrc20/qrc20_tests.rs @@ -453,7 +453,7 @@ fn test_wait_for_tx_spend_malicious() { let payment_tx = hex::decode("01000000016601daa208531d20532c460d0c86b74a275f4a126bbffcf4eafdf33835af2859010000006a47304402205825657548bc1b5acf3f4bb2f89635a02b04f3228cd08126e63c5834888e7ac402207ca05fa0a629a31908a97a508e15076e925f8e621b155312b7526a6666b06a76012103693bff1b39e8b5a306810023c29b95397eb395530b106b1820ea235fd81d9ce9ffffffff020000000000000000e35403a0860101284cc49b415b2a8620ad3b72361a5aeba5dffd333fb64750089d935a1ec974d6a91ef4f24ff6ba0000000000000000000000000000000000000000000000000000000001312d00000000000000000000000000d362e096e873eb7907e205fadc6175c6fec7bc44000000000000000000000000783cf0be521101942da509846ea476e683aad8324b6b2e5444c2639cc0fb7bcea5afba3f3cdce239000000000000000000000000000000000000000000000000000000000000000000000000000000005f855c7614ba8b71f3544b93e2f681f996da519a98ace0107ac2203de400000000001976a9149e032d4b0090a11dc40fe6c47601499a35d55fbb88ac415d855f").unwrap(); let wait_until = now_sec() + 1; let from_block = 696245; - let found = block_on_f01(coin.wait_for_htlc_tx_spend(WaitForHTLCTxSpendArgs { + let found = block_on(coin.wait_for_htlc_tx_spend(WaitForHTLCTxSpendArgs { tx_bytes: &payment_tx, secret_hash: &[], wait_until, diff --git a/mm2src/coins/siacoin.rs b/mm2src/coins/siacoin.rs index 1bd8ef6c2d..bc57aaaf10 100644 --- a/mm2src/coins/siacoin.rs +++ b/mm2src/coins/siacoin.rs @@ -370,7 +370,7 @@ impl MarketCoinOps for SiaCoin { unimplemented!() } - fn wait_for_htlc_tx_spend(&self, _args: WaitForHTLCTxSpendArgs<'_>) -> TransactionFut { unimplemented!() } + async fn wait_for_htlc_tx_spend(&self, _args: WaitForHTLCTxSpendArgs<'_>) -> TransactionResult { unimplemented!() } fn tx_enum_from_bytes(&self, _bytes: &[u8]) -> Result> { MmError::err(TxMarshalingErr::NotSupported( diff --git a/mm2src/coins/tendermint/tendermint_coin.rs b/mm2src/coins/tendermint/tendermint_coin.rs index 5b4e7953a7..fd1396724f 100644 --- a/mm2src/coins/tendermint/tendermint_coin.rs +++ b/mm2src/coins/tendermint/tendermint_coin.rs @@ -2596,14 +2596,14 @@ impl MarketCoinOps for TendermintCoin { Box::new(fut.boxed().compat()) } - fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionFut { - let tx = try_tx_fus!(cosmrs::Tx::from_bytes(args.tx_bytes)); - let first_message = try_tx_fus!(tx.body.messages.first().ok_or("Tx body couldn't be read.")); - let htlc_proto = try_tx_fus!(CreateHtlcProto::decode( - try_tx_fus!(HtlcType::from_str(&self.account_prefix)), + async fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionResult { + let tx = try_tx_s!(cosmrs::Tx::from_bytes(args.tx_bytes)); + let first_message = try_tx_s!(tx.body.messages.first().ok_or("Tx body couldn't be read.")); + let htlc_proto = try_tx_s!(CreateHtlcProto::decode( + try_tx_s!(HtlcType::from_str(&self.account_prefix)), first_message.value.as_slice() )); - let htlc = try_tx_fus!(CreateHtlcMsg::try_from(htlc_proto)); + let htlc = try_tx_s!(CreateHtlcMsg::try_from(htlc_proto)); let htlc_id = self.calculate_htlc_id(htlc.sender(), htlc.to(), htlc.amount(), args.secret_hash); let events_string = format!("claim_htlc.id='{}'", htlc_id); @@ -2618,38 +2618,32 @@ impl MarketCoinOps for TendermintCoin { }; let encoded_request = request.encode_to_vec(); - let coin = self.clone(); - let wait_until = args.wait_until; - let fut = async move { - loop { - let response = try_tx_s!( - try_tx_s!(coin.rpc_client().await) - .abci_query( - Some(ABCI_GET_TXS_EVENT_PATH.to_string()), - encoded_request.as_slice(), - ABCI_REQUEST_HEIGHT, - ABCI_REQUEST_PROVE - ) - .await - ); - let response = try_tx_s!(GetTxsEventResponse::decode(response.value.as_slice())); - if let Some(tx) = response.txs.first() { - return Ok(TransactionEnum::CosmosTransaction(CosmosTransaction { - data: TxRaw { - body_bytes: tx.body.as_ref().map(Message::encode_to_vec).unwrap_or_default(), - auth_info_bytes: tx.auth_info.as_ref().map(Message::encode_to_vec).unwrap_or_default(), - signatures: tx.signatures.clone(), - }, - })); - } - Timer::sleep(5.).await; - if get_utc_timestamp() > wait_until as i64 { - return Err(TransactionErr::Plain("Waited too long".into())); - } + loop { + let response = try_tx_s!( + try_tx_s!(self.rpc_client().await) + .abci_query( + Some(ABCI_GET_TXS_EVENT_PATH.to_string()), + encoded_request.as_slice(), + ABCI_REQUEST_HEIGHT, + ABCI_REQUEST_PROVE + ) + .await + ); + let response = try_tx_s!(GetTxsEventResponse::decode(response.value.as_slice())); + if let Some(tx) = response.txs.first() { + return Ok(TransactionEnum::CosmosTransaction(CosmosTransaction { + data: TxRaw { + body_bytes: tx.body.as_ref().map(Message::encode_to_vec).unwrap_or_default(), + auth_info_bytes: tx.auth_info.as_ref().map(Message::encode_to_vec).unwrap_or_default(), + signatures: tx.signatures.clone(), + }, + })); } - }; - - Box::new(fut.boxed().compat()) + Timer::sleep(5.).await; + if get_utc_timestamp() > args.wait_until as i64 { + return Err(TransactionErr::Plain("Waited too long".into())); + } + } } fn tx_enum_from_bytes(&self, bytes: &[u8]) -> Result> { @@ -3637,18 +3631,15 @@ pub mod tendermint_coin_tests { let encoded_tx = tx.encode_to_vec(); let secret_hash = hex::decode("0C34C71EBA2A51738699F9F3D6DAFFB15BE576E8ED543203485791B5DA39D10D").unwrap(); - let spend_tx = block_on( - coin.wait_for_htlc_tx_spend(WaitForHTLCTxSpendArgs { - tx_bytes: &encoded_tx, - secret_hash: &secret_hash, - wait_until: get_utc_timestamp() as u64, - from_block: 0, - swap_contract_address: &None, - check_every: TAKER_PAYMENT_SPEND_SEARCH_INTERVAL, - watcher_reward: false, - }) - .compat(), - ) + let spend_tx = block_on(coin.wait_for_htlc_tx_spend(WaitForHTLCTxSpendArgs { + tx_bytes: &encoded_tx, + secret_hash: &secret_hash, + wait_until: get_utc_timestamp() as u64, + from_block: 0, + swap_contract_address: &None, + check_every: TAKER_PAYMENT_SPEND_SEARCH_INTERVAL, + watcher_reward: false, + })) .unwrap(); // https://nyancat.iobscan.io/#/tx?txHash=565C820C1F95556ADC251F16244AAD4E4274772F41BC13F958C9C2F89A14D137 diff --git a/mm2src/coins/tendermint/tendermint_token.rs b/mm2src/coins/tendermint/tendermint_token.rs index a2df5bc117..3ddee75ebc 100644 --- a/mm2src/coins/tendermint/tendermint_token.rs +++ b/mm2src/coins/tendermint/tendermint_token.rs @@ -453,16 +453,18 @@ impl MarketCoinOps for TendermintToken { self.platform_coin.wait_for_confirmations(input) } - fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionFut { - self.platform_coin.wait_for_htlc_tx_spend(WaitForHTLCTxSpendArgs { - tx_bytes: args.tx_bytes, - secret_hash: args.secret_hash, - wait_until: args.wait_until, - from_block: args.from_block, - swap_contract_address: args.swap_contract_address, - check_every: args.check_every, - watcher_reward: false, - }) + async fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionResult { + self.platform_coin + .wait_for_htlc_tx_spend(WaitForHTLCTxSpendArgs { + tx_bytes: args.tx_bytes, + secret_hash: args.secret_hash, + wait_until: args.wait_until, + from_block: args.from_block, + swap_contract_address: args.swap_contract_address, + check_every: args.check_every, + watcher_reward: false, + }) + .await } fn tx_enum_from_bytes(&self, bytes: &[u8]) -> Result> { diff --git a/mm2src/coins/test_coin.rs b/mm2src/coins/test_coin.rs index 8f1dd0a508..b558d20789 100644 --- a/mm2src/coins/test_coin.rs +++ b/mm2src/coins/test_coin.rs @@ -92,7 +92,7 @@ impl MarketCoinOps for TestCoin { unimplemented!() } - fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionFut { unimplemented!() } + async fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionResult { unimplemented!() } fn tx_enum_from_bytes(&self, _bytes: &[u8]) -> Result> { MmError::err(TxMarshalingErr::NotSupported( diff --git a/mm2src/coins/utxo/bch.rs b/mm2src/coins/utxo/bch.rs index a700ec0b5a..d71b6538e3 100644 --- a/mm2src/coins/utxo/bch.rs +++ b/mm2src/coins/utxo/bch.rs @@ -1251,7 +1251,7 @@ impl MarketCoinOps for BchCoin { utxo_common::wait_for_confirmations(&self.utxo_arc, input) } - fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionFut { + async fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionResult { utxo_common::wait_for_output_spend( self.clone(), args.tx_bytes, @@ -1260,6 +1260,7 @@ impl MarketCoinOps for BchCoin { args.wait_until, args.check_every, ) + .await } fn tx_enum_from_bytes(&self, bytes: &[u8]) -> Result> { diff --git a/mm2src/coins/utxo/qtum.rs b/mm2src/coins/utxo/qtum.rs index 106258ea55..c5fbc67293 100644 --- a/mm2src/coins/utxo/qtum.rs +++ b/mm2src/coins/utxo/qtum.rs @@ -871,7 +871,7 @@ impl MarketCoinOps for QtumCoin { utxo_common::wait_for_confirmations(&self.utxo_arc, input) } - fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionFut { + async fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionResult { utxo_common::wait_for_output_spend( self.clone(), args.tx_bytes, @@ -880,6 +880,7 @@ impl MarketCoinOps for QtumCoin { args.wait_until, args.check_every, ) + .await } fn tx_enum_from_bytes(&self, bytes: &[u8]) -> Result> { diff --git a/mm2src/coins/utxo/slp.rs b/mm2src/coins/utxo/slp.rs index b1e7f1ef4c..cbc7780a34 100644 --- a/mm2src/coins/utxo/slp.rs +++ b/mm2src/coins/utxo/slp.rs @@ -1189,7 +1189,7 @@ impl MarketCoinOps for SlpToken { self.platform_coin.wait_for_confirmations(input) } - fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionFut { + async fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionResult { utxo_common::wait_for_output_spend( self.clone(), args.tx_bytes, @@ -1198,6 +1198,7 @@ impl MarketCoinOps for SlpToken { args.wait_until, args.check_every, ) + .await } fn tx_enum_from_bytes(&self, bytes: &[u8]) -> Result> { diff --git a/mm2src/coins/utxo/utxo_common.rs b/mm2src/coins/utxo/utxo_common.rs index 91116e109a..37955e9030 100644 --- a/mm2src/coins/utxo/utxo_common.rs +++ b/mm2src/coins/utxo/utxo_common.rs @@ -2904,24 +2904,21 @@ pub async fn wait_for_output_spend_impl( } } -pub fn wait_for_output_spend + Send + Sync + 'static>( +pub async fn wait_for_output_spend + Send + Sync + 'static>( coin: T, tx_bytes: &[u8], output_index: usize, from_block: u64, wait_until: u64, check_every: f64, -) -> TransactionFut { - let mut tx: UtxoTx = try_tx_fus!(deserialize(tx_bytes).map_err(|e| ERRL!("{:?}", e))); +) -> TransactionResult { + let mut tx: UtxoTx = try_tx_s!(deserialize(tx_bytes).map_err(|e| ERRL!("{:?}", e))); tx.tx_hash_algo = coin.as_ref().tx_hash_algo; - let fut = async move { - wait_for_output_spend_impl(coin.as_ref(), &tx, output_index, from_block, wait_until, check_every) - .await - .map(|tx| tx.into()) - .map_err(|e| TransactionErr::Plain(format!("{:?}", e))) - }; - Box::new(fut.boxed().compat()) + wait_for_output_spend_impl(coin.as_ref(), &tx, output_index, from_block, wait_until, check_every) + .await + .map(|tx| tx.into()) + .map_err(|e| TransactionErr::Plain(format!("{:?}", e))) } pub fn tx_enum_from_bytes(coin: &UtxoCoinFields, bytes: &[u8]) -> Result> { diff --git a/mm2src/coins/utxo/utxo_standard.rs b/mm2src/coins/utxo/utxo_standard.rs index d72c3d4963..02c559caf1 100644 --- a/mm2src/coins/utxo/utxo_standard.rs +++ b/mm2src/coins/utxo/utxo_standard.rs @@ -944,7 +944,7 @@ impl MarketCoinOps for UtxoStandardCoin { utxo_common::wait_for_confirmations(&self.utxo_arc, input) } - fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionFut { + async fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionResult { utxo_common::wait_for_output_spend( self.clone(), args.tx_bytes, @@ -953,6 +953,7 @@ impl MarketCoinOps for UtxoStandardCoin { args.wait_until, args.check_every, ) + .await } fn tx_enum_from_bytes(&self, bytes: &[u8]) -> Result> { diff --git a/mm2src/coins/utxo/utxo_tests.rs b/mm2src/coins/utxo/utxo_tests.rs index 373fd29305..bcd7cc991f 100644 --- a/mm2src/coins/utxo/utxo_tests.rs +++ b/mm2src/coins/utxo/utxo_tests.rs @@ -437,7 +437,7 @@ fn test_wait_for_payment_spend_timeout_native() { let wait_until = now_sec() - 1; let from_block = 1000; - assert!(block_on_f01(coin.wait_for_htlc_tx_spend(WaitForHTLCTxSpendArgs { + assert!(block_on(coin.wait_for_htlc_tx_spend(WaitForHTLCTxSpendArgs { tx_bytes: &transaction, secret_hash: &[], wait_until, @@ -492,7 +492,7 @@ fn test_wait_for_payment_spend_timeout_electrum() { let wait_until = now_sec() - 1; let from_block = 1000; - assert!(block_on_f01(coin.wait_for_htlc_tx_spend(WaitForHTLCTxSpendArgs { + assert!(block_on(coin.wait_for_htlc_tx_spend(WaitForHTLCTxSpendArgs { tx_bytes: &transaction, secret_hash: &[], wait_until, diff --git a/mm2src/coins/z_coin.rs b/mm2src/coins/z_coin.rs index 07462d2a07..d8082ea73f 100644 --- a/mm2src/coins/z_coin.rs +++ b/mm2src/coins/z_coin.rs @@ -1174,7 +1174,7 @@ impl MarketCoinOps for ZCoin { utxo_common::wait_for_confirmations(self.as_ref(), input) } - fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionFut { + async fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionResult { utxo_common::wait_for_output_spend( self.clone(), args.tx_bytes, @@ -1183,6 +1183,7 @@ impl MarketCoinOps for ZCoin { args.wait_until, args.check_every, ) + .await } fn tx_enum_from_bytes(&self, bytes: &[u8]) -> Result> { diff --git a/mm2src/mm2_main/src/lp_swap/swap_watcher.rs b/mm2src/mm2_main/src/lp_swap/swap_watcher.rs index 16e3712f16..be33003345 100644 --- a/mm2src/mm2_main/src/lp_swap/swap_watcher.rs +++ b/mm2src/mm2_main/src/lp_swap/swap_watcher.rs @@ -356,7 +356,7 @@ impl State for WaitForTakerPaymentSpend { watcher_reward: watcher_ctx.watcher_reward, }); - if f.compat().await.is_ok() { + if f.await.is_ok() { info!("{}", MAKER_PAYMENT_SPEND_FOUND_LOG); return Self::change_state(Stopped::from_reason(StopReason::Finished( WatcherSuccess::MakerPaymentSpentByTaker, diff --git a/mm2src/mm2_main/src/lp_swap/taker_swap.rs b/mm2src/mm2_main/src/lp_swap/taker_swap.rs index 4216408898..c7b1cf59a9 100644 --- a/mm2src/mm2_main/src/lp_swap/taker_swap.rs +++ b/mm2src/mm2_main/src/lp_swap/taker_swap.rs @@ -1685,7 +1685,7 @@ impl TakerSwap { async fn wait_for_taker_payment_spend(&self) -> Result<(Option, Vec), String> { const BROADCAST_MSG_INTERVAL_SEC: f64 = 600.; - let tx_hex = self.r().taker_payment.as_ref().unwrap().tx_hex.0.clone(); + let tx_hex = self.r().taker_payment.as_ref().unwrap().tx_hex.clone(); let mut watcher_broadcast_abort_handle = None; // Watchers cannot be used for lightning swaps for now // Todo: Check if watchers can work in some cases with lightning and implement it if it's possible, this part will probably work if only the taker is lightning since the preimage is available @@ -1715,7 +1715,7 @@ impl TakerSwap { } // Todo: taker_payment should be a message on lightning network not a swap message - let msg = SwapMsg::TakerPayment(tx_hex); + let msg = SwapMsg::TakerPayment(tx_hex.0.clone()); let send_abort_handle = broadcast_swap_msg_every( self.ctx.clone(), swap_topic(&self.uuid), @@ -1738,16 +1738,20 @@ impl TakerSwap { Err(_) => self.r().data.taker_payment_lock, }; + let secret_hash = self.r().secret_hash.clone(); + let taker_coin_start_block = self.r().data.taker_coin_start_block; + let taker_coin_swap_contract_address = self.r().data.taker_coin_swap_contract_address.clone(); + let watcher_reward = self.r().watcher_reward; let f = self.taker_coin.wait_for_htlc_tx_spend(WaitForHTLCTxSpendArgs { - tx_bytes: &self.r().taker_payment.clone().unwrap().tx_hex, - secret_hash: &self.r().secret_hash.0, + tx_bytes: &tx_hex, + secret_hash: &secret_hash.0, wait_until, - from_block: self.r().data.taker_coin_start_block, - swap_contract_address: &self.r().data.taker_coin_swap_contract_address, + from_block: taker_coin_start_block, + swap_contract_address: &taker_coin_swap_contract_address, check_every: TAKER_PAYMENT_SPEND_SEARCH_INTERVAL, - watcher_reward: self.r().watcher_reward, + watcher_reward, }); - let tx = match f.compat().await { + let tx = match f.await { Ok(t) => t, Err(err) => { return Ok((Some(TakerSwapCommand::PrepareForTakerPaymentRefund), vec![ @@ -1767,8 +1771,6 @@ impl TakerSwap { tx_hash, }; - let secret_hash = self.r().secret_hash.clone(); - let watcher_reward = self.r().watcher_reward; let secret = match self .taker_coin .extract_secret(&secret_hash.0, &tx_ident.tx_hex, watcher_reward) diff --git a/mm2src/mm2_main/tests/docker_tests/qrc20_tests.rs b/mm2src/mm2_main/tests/docker_tests/qrc20_tests.rs index 352b383941..cfbd2df664 100644 --- a/mm2src/mm2_main/tests/docker_tests/qrc20_tests.rs +++ b/mm2src/mm2_main/tests/docker_tests/qrc20_tests.rs @@ -794,7 +794,7 @@ fn test_wait_for_tx_spend() { // first try to check if the wait_for_htlc_tx_spend() returns an error correctly let wait_until = wait_until_sec(5); - let tx_err = block_on_f01(maker_coin.wait_for_htlc_tx_spend(WaitForHTLCTxSpendArgs { + let tx_err = block_on(maker_coin.wait_for_htlc_tx_spend(WaitForHTLCTxSpendArgs { tx_bytes: &payment_tx_hex, secret_hash: &[], wait_until, @@ -831,7 +831,7 @@ fn test_wait_for_tx_spend() { }); let wait_until = wait_until_sec(120); - let found = block_on_f01(maker_coin.wait_for_htlc_tx_spend(WaitForHTLCTxSpendArgs { + let found = block_on(maker_coin.wait_for_htlc_tx_spend(WaitForHTLCTxSpendArgs { tx_bytes: &payment_tx_hex, secret_hash: &[], wait_until, From ece2db4db06a21126660b6da5b94d0bdb6ffeae1 Mon Sep 17 00:00:00 2001 From: shamardy Date: Thu, 7 Nov 2024 18:40:35 +0200 Subject: [PATCH 2/2] Refactor `wait_for_htlc_tx_spend` call in `swap_watcher.rs` to inline the `await` and `is_ok` check. --- mm2src/mm2_main/src/lp_swap/swap_watcher.rs | 25 ++++++++++++--------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/mm2src/mm2_main/src/lp_swap/swap_watcher.rs b/mm2src/mm2_main/src/lp_swap/swap_watcher.rs index be33003345..95a1d1e88a 100644 --- a/mm2src/mm2_main/src/lp_swap/swap_watcher.rs +++ b/mm2src/mm2_main/src/lp_swap/swap_watcher.rs @@ -346,17 +346,20 @@ impl State for WaitForTakerPaymentSpend { }, }; - let f = watcher_ctx.maker_coin.wait_for_htlc_tx_spend(WaitForHTLCTxSpendArgs { - tx_bytes: &maker_payment_hex, - secret_hash: &watcher_ctx.data.secret_hash, - wait_until, - from_block: watcher_ctx.data.maker_coin_start_block, - swap_contract_address: &None, - check_every: payment_search_interval, - watcher_reward: watcher_ctx.watcher_reward, - }); - - if f.await.is_ok() { + if watcher_ctx + .maker_coin + .wait_for_htlc_tx_spend(WaitForHTLCTxSpendArgs { + tx_bytes: &maker_payment_hex, + secret_hash: &watcher_ctx.data.secret_hash, + wait_until, + from_block: watcher_ctx.data.maker_coin_start_block, + swap_contract_address: &None, + check_every: payment_search_interval, + watcher_reward: watcher_ctx.watcher_reward, + }) + .await + .is_ok() + { info!("{}", MAKER_PAYMENT_SPEND_FOUND_LOG); return Self::change_state(Stopped::from_reason(StopReason::Finished( WatcherSuccess::MakerPaymentSpentByTaker,