diff --git a/mm2src/lp_network.rs b/mm2src/lp_network.rs index ee54ccf446..7e4ace2487 100644 --- a/mm2src/lp_network.rs +++ b/mm2src/lp_network.rs @@ -22,12 +22,13 @@ use common::mm_ctx::{MmArc, MmWeak}; use common::mm_error::prelude::*; use common::mm_metrics::{ClockOps, MetricsOps}; use derive_more::Display; -use futures::{channel::oneshot, lock::Mutex as AsyncMutex, StreamExt}; +use futures::{channel::oneshot, StreamExt}; use mm2_libp2p::atomicdex_behaviour::{AdexBehaviourCmd, AdexBehaviourEvent, AdexCmdTx, AdexEventRx, AdexResponse, AdexResponseChannel}; use mm2_libp2p::peers_exchange::PeerAddresses; use mm2_libp2p::{decode_message, encode_message, GossipsubMessage, MessageId, NetworkPorts, PeerId, TOPIC_SEPARATOR}; #[cfg(test)] use mocktopus::macros::*; +use parking_lot::Mutex as PaMutex; use serde::de; use std::net::ToSocketAddrs; use std::sync::Arc; @@ -62,14 +63,14 @@ pub enum P2PRequest { pub struct P2PContext { /// Using Mutex helps to prevent cloning which can actually result to channel being unbounded in case of using 1 tx clone per 1 message. - pub cmd_tx: AsyncMutex, + pub cmd_tx: PaMutex, } #[cfg_attr(test, mockable)] impl P2PContext { pub fn new(cmd_tx: AdexCmdTx) -> Self { P2PContext { - cmd_tx: AsyncMutex::new(cmd_tx), + cmd_tx: PaMutex::new(cmd_tx), } } @@ -103,7 +104,7 @@ pub async fn p2p_event_process_loop(ctx: MmWeak, mut rx: AdexEventRx, i_am_relay request, response_channel, }) => { - if let Err(e) = process_p2p_request(ctx, peer_id, request, response_channel).await { + if let Err(e) = process_p2p_request(ctx, peer_id, request, response_channel) { log::error!("Error on process P2P request: {:?}", e); } }, @@ -157,7 +158,7 @@ async fn process_p2p_message( } } -async fn process_p2p_request( +fn process_p2p_request( ctx: MmArc, _peer_id: PeerId, request: Vec, @@ -165,8 +166,8 @@ async fn process_p2p_request( ) -> P2PRequestResult<()> { let request = decode_message::(&request)?; let result = match request { - P2PRequest::Ordermatch(req) => lp_ordermatch::process_peer_request(ctx.clone(), req).await, - P2PRequest::NetworkInfo(req) => lp_stats::process_info_request(ctx.clone(), req).await, + P2PRequest::Ordermatch(req) => lp_ordermatch::process_peer_request(ctx.clone(), req), + P2PRequest::NetworkInfo(req) => lp_stats::process_info_request(ctx.clone(), req), }; let res = match result { @@ -180,7 +181,6 @@ async fn process_p2p_request( p2p_ctx .cmd_tx .lock() - .await .try_send(cmd) .map_to_mm(|e| P2PRequestError::SendError(e.to_string()))?; Ok(()) @@ -188,13 +188,11 @@ async fn process_p2p_request( pub fn broadcast_p2p_msg(ctx: &MmArc, topics: Vec, msg: Vec) { let ctx = ctx.clone(); - spawn(async move { - let cmd = AdexBehaviourCmd::PublishMsg { topics, msg }; - let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx); - if let Err(e) = p2p_ctx.cmd_tx.lock().await.try_send(cmd) { - log::error!("broadcast_p2p_msg cmd_tx.send error {:?}", e); - }; - }); + let cmd = AdexBehaviourCmd::PublishMsg { topics, msg }; + let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx); + if let Err(e) = p2p_ctx.cmd_tx.lock().try_send(cmd) { + log::error!("broadcast_p2p_msg cmd_tx.send error {:?}", e); + }; } /// Subscribe to the given `topic`. @@ -202,10 +200,10 @@ pub fn broadcast_p2p_msg(ctx: &MmArc, topics: Vec, msg: Vec) { /// # Safety /// /// The function locks the [`MmCtx::p2p_ctx`] mutex. -pub async fn subscribe_to_topic(ctx: &MmArc, topic: String) { +pub fn subscribe_to_topic(ctx: &MmArc, topic: String) { let p2p_ctx = P2PContext::fetch_from_mm_arc(ctx); let cmd = AdexBehaviourCmd::Subscribe { topic }; - if let Err(e) = p2p_ctx.cmd_tx.lock().await.try_send(cmd) { + if let Err(e) = p2p_ctx.cmd_tx.lock().try_send(cmd) { log::error!("subscribe_to_topic cmd_tx.send error {:?}", e); }; } @@ -225,7 +223,6 @@ pub async fn request_any_relay( p2p_ctx .cmd_tx .lock() - .await .try_send(cmd) .map_to_mm(|e| P2PRequestError::SendError(e.to_string()))?; match response_rx @@ -262,7 +259,6 @@ pub async fn request_relays( p2p_ctx .cmd_tx .lock() - .await .try_send(cmd) .map_to_mm(|e| P2PRequestError::SendError(e.to_string()))?; let responses = response_rx @@ -288,7 +284,6 @@ pub async fn request_peers( p2p_ctx .cmd_tx .lock() - .await .try_send(cmd) .map_to_mm(|e| P2PRequestError::SendError(e.to_string()))?; let responses = response_rx @@ -339,27 +334,23 @@ fn parse_peers_responses( pub fn propagate_message(ctx: &MmArc, message_id: MessageId, propagation_source: PeerId) { let ctx = ctx.clone(); - spawn(async move { - let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx); - let cmd = AdexBehaviourCmd::PropagateMessage { - message_id, - propagation_source, - }; - if let Err(e) = p2p_ctx.cmd_tx.lock().await.try_send(cmd) { - log::error!("propagate_message cmd_tx.send error {:?}", e); - }; - }); + let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx); + let cmd = AdexBehaviourCmd::PropagateMessage { + message_id, + propagation_source, + }; + if let Err(e) = p2p_ctx.cmd_tx.lock().try_send(cmd) { + log::error!("propagate_message cmd_tx.send error {:?}", e); + }; } pub fn add_reserved_peer_addresses(ctx: &MmArc, peer: PeerId, addresses: PeerAddresses) { let ctx = ctx.clone(); - spawn(async move { - let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx); - let cmd = AdexBehaviourCmd::AddReservedPeer { peer, addresses }; - if let Err(e) = p2p_ctx.cmd_tx.lock().await.try_send(cmd) { - log::error!("add_reserved_peer_addresses cmd_tx.send error {:?}", e); - }; - }); + let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx); + let cmd = AdexBehaviourCmd::AddReservedPeer { peer, addresses }; + if let Err(e) = p2p_ctx.cmd_tx.lock().try_send(cmd) { + log::error!("add_reserved_peer_addresses cmd_tx.send error {:?}", e); + }; } #[derive(Debug, Display)] diff --git a/mm2src/lp_ordermatch.rs b/mm2src/lp_ordermatch.rs index c39682952d..40423f3442 100644 --- a/mm2src/lp_ordermatch.rs +++ b/mm2src/lp_ordermatch.rs @@ -233,7 +233,6 @@ async fn process_orders_keep_alive( let to_request = ordermatch_ctx .orderbook .lock() - .await .process_keep_alive(&from_pubkey, keep_alive, i_am_relay); let req = match to_request { @@ -251,7 +250,7 @@ async fn process_orders_keep_alive( _ => return false, }; - let mut orderbook = ordermatch_ctx.orderbook.lock().await; + let mut orderbook = ordermatch_ctx.orderbook.lock(); for (pair, diff) in response.pair_orders_diff { let _new_root = match diff { DeltaOrFullTrie::Delta(delta) => { @@ -265,14 +264,10 @@ async fn process_orders_keep_alive( true } -async fn process_maker_order_updated( - ctx: MmArc, - from_pubkey: String, - updated_msg: new_protocol::MakerOrderUpdated, -) -> bool { +fn process_maker_order_updated(ctx: MmArc, from_pubkey: String, updated_msg: new_protocol::MakerOrderUpdated) -> bool { let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).expect("from_ctx failed"); let uuid = updated_msg.uuid(); - let mut orderbook = ordermatch_ctx.orderbook.lock().await; + let mut orderbook = ordermatch_ctx.orderbook.lock(); match orderbook.find_order_by_uuid_and_pubkey(&uuid, &from_pubkey) { Some(mut order) => { order.apply_updated(&updated_msg); @@ -329,7 +324,7 @@ async fn request_and_fill_orderbook(ctx: &MmArc, base: &str, rel: &str) -> Resul }; let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).unwrap(); - let mut orderbook = ordermatch_ctx.orderbook.lock().await; + let mut orderbook = ordermatch_ctx.orderbook.lock(); let alb_pair = alb_ordered_pair(base, rel); for (pubkey, GetOrderbookPubkeyItem { orders, .. }) in pubkey_orders { @@ -357,16 +352,16 @@ async fn request_and_fill_orderbook(ctx: &MmArc, base: &str, rel: &str) -> Resul /// Insert or update an order `req`. /// Note this function locks the [`OrdermatchContext::orderbook`] async mutex. -async fn insert_or_update_order(ctx: &MmArc, item: OrderbookItem) { +fn insert_or_update_order(ctx: &MmArc, item: OrderbookItem) { let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).expect("from_ctx failed"); - let mut orderbook = ordermatch_ctx.orderbook.lock().await; + let mut orderbook = ordermatch_ctx.orderbook.lock(); orderbook.insert_or_update_order_update_trie(item) } -async fn delete_order(ctx: &MmArc, pubkey: &str, uuid: Uuid) { +fn delete_order(ctx: &MmArc, pubkey: &str, uuid: Uuid) { let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).expect("from_ctx failed"); - let mut orderbook = ordermatch_ctx.orderbook.lock().await; + let mut orderbook = ordermatch_ctx.orderbook.lock(); if let Some(order) = orderbook.order_set.get(&uuid) { if order.pubkey == pubkey { orderbook.remove_order_trie_update(uuid); @@ -374,9 +369,9 @@ async fn delete_order(ctx: &MmArc, pubkey: &str, uuid: Uuid) { } } -async fn delete_my_order(ctx: &MmArc, uuid: Uuid) { +fn delete_my_order(ctx: &MmArc, uuid: Uuid) { let ordermatch_ctx: Arc = OrdermatchContext::from_ctx(ctx).expect("from_ctx failed"); - let mut orderbook = ordermatch_ctx.orderbook.lock().await; + let mut orderbook = ordermatch_ctx.orderbook.lock(); orderbook.remove_order_trie_update(uuid); } @@ -425,7 +420,7 @@ pub async fn process_msg(ctx: MmArc, _topics: Vec, from_peer: String, ms match message { new_protocol::OrdermatchMessage::MakerOrderCreated(created_msg) => { let order: OrderbookItem = (created_msg, hex::encode(pubkey.to_bytes().as_slice())).into(); - insert_or_update_order(&ctx, order).await; + insert_or_update_order(&ctx, order); true }, new_protocol::OrdermatchMessage::PubkeyKeepAlive(keep_alive) => { @@ -451,11 +446,11 @@ pub async fn process_msg(ctx: MmArc, _topics: Vec, from_peer: String, ms true }, new_protocol::OrdermatchMessage::MakerOrderCancelled(cancelled_msg) => { - delete_order(&ctx, &pubkey.to_hex(), cancelled_msg.uuid.into()).await; + delete_order(&ctx, &pubkey.to_hex(), cancelled_msg.uuid.into()); true }, new_protocol::OrdermatchMessage::MakerOrderUpdated(updated_msg) => { - process_maker_order_updated(ctx, pubkey.to_hex(), updated_msg).await + process_maker_order_updated(ctx, pubkey.to_hex(), updated_msg) }, } }, @@ -526,20 +521,18 @@ impl TryFromBytes for Uuid { } } -pub async fn process_peer_request(ctx: MmArc, request: OrdermatchRequest) -> Result>, String> { +pub fn process_peer_request(ctx: MmArc, request: OrdermatchRequest) -> Result>, String> { log::debug!("Got ordermatch request {:?}", request); match request { - OrdermatchRequest::GetOrderbook { base, rel } => process_get_orderbook_request(ctx, base, rel).await, + OrdermatchRequest::GetOrderbook { base, rel } => process_get_orderbook_request(ctx, base, rel), OrdermatchRequest::SyncPubkeyOrderbookState { pubkey, trie_roots } => { - let response = process_sync_pubkey_orderbook_state(ctx, pubkey, trie_roots).await; + let response = process_sync_pubkey_orderbook_state(ctx, pubkey, trie_roots); response.map(|res| res.map(|r| encode_message(&r).expect("Serialization failed"))) }, OrdermatchRequest::BestOrders { coin, action, volume } => { - best_orders::process_best_orders_p2p_request(ctx, coin, action, volume).await - }, - OrdermatchRequest::OrderbookDepth { pairs } => { - orderbook_depth::process_orderbook_depth_p2p_request(ctx, pairs).await + best_orders::process_best_orders_p2p_request(ctx, coin, action, volume) }, + OrdermatchRequest::OrderbookDepth { pairs } => orderbook_depth::process_orderbook_depth_p2p_request(ctx, pairs), } } @@ -600,9 +593,9 @@ fn get_pubkeys_orders( (total_orders_number, uuids_by_pubkey, protocol_infos) } -async fn process_get_orderbook_request(ctx: MmArc, base: String, rel: String) -> Result>, String> { +fn process_get_orderbook_request(ctx: MmArc, base: String, rel: String) -> Result>, String> { let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); - let orderbook = ordermatch_ctx.orderbook.lock().await; + let orderbook = ordermatch_ctx.orderbook.lock(); let (total_orders_number, orders, protocol_infos) = get_pubkeys_orders(&orderbook, base, rel); if total_orders_number > MAX_ORDERS_NUMBER_IN_ORDERBOOK_RESPONSE { @@ -746,13 +739,13 @@ struct SyncPubkeyOrderbookStateRes { protocol_infos: HashMap, } -async fn process_sync_pubkey_orderbook_state( +fn process_sync_pubkey_orderbook_state( ctx: MmArc, pubkey: String, trie_roots: HashMap, ) -> Result, String> { let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); - let orderbook = ordermatch_ctx.orderbook.lock().await; + let orderbook = ordermatch_ctx.orderbook.lock(); let pubkey_state = match orderbook.pubkeys_state.get(&pubkey) { Some(s) => s, None => return Ok(None), @@ -860,7 +853,7 @@ fn test_parse_orderbook_pair_from_topic() { assert_eq!(None, parse_orderbook_pair_from_topic("orbk/BTC:")); } -async fn maker_order_created_p2p_notify( +fn maker_order_created_p2p_notify( ctx: MmArc, order: &MakerOrder, base_protocol_info: Vec, @@ -886,13 +879,13 @@ async fn maker_order_created_p2p_notify( let to_broadcast = new_protocol::OrdermatchMessage::MakerOrderCreated(message.clone()); let encoded_msg = encode_and_sign(&to_broadcast, &*key_pair.private().secret).unwrap(); let order: OrderbookItem = (message, hex::encode(&**key_pair.public())).into(); - insert_or_update_order(&ctx, order).await; + insert_or_update_order(&ctx, order); broadcast_p2p_msg(&ctx, vec![topic], encoded_msg); } -async fn process_my_maker_order_updated(ctx: &MmArc, message: &new_protocol::MakerOrderUpdated) { +fn process_my_maker_order_updated(ctx: &MmArc, message: &new_protocol::MakerOrderUpdated) { let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).expect("from_ctx failed"); - let mut orderbook = ordermatch_ctx.orderbook.lock().await; + let mut orderbook = ordermatch_ctx.orderbook.lock(); let uuid = message.uuid(); if let Some(mut order) = orderbook.find_order_by_uuid(&uuid) { @@ -901,21 +894,21 @@ async fn process_my_maker_order_updated(ctx: &MmArc, message: &new_protocol::Mak } } -async fn maker_order_updated_p2p_notify(ctx: MmArc, topic: String, message: new_protocol::MakerOrderUpdated) { +fn maker_order_updated_p2p_notify(ctx: MmArc, topic: String, message: new_protocol::MakerOrderUpdated) { let msg: new_protocol::OrdermatchMessage = message.clone().into(); let key_pair = ctx.secp256k1_key_pair.or(&&|| panic!()); let encoded_msg = encode_and_sign(&msg, &*key_pair.private().secret).unwrap(); - process_my_maker_order_updated(&ctx, &message).await; + process_my_maker_order_updated(&ctx, &message); broadcast_p2p_msg(&ctx, vec![topic], encoded_msg); } -async fn maker_order_cancelled_p2p_notify(ctx: MmArc, order: &MakerOrder) { +fn maker_order_cancelled_p2p_notify(ctx: MmArc, order: &MakerOrder) { let message = new_protocol::OrdermatchMessage::MakerOrderCancelled(new_protocol::MakerOrderCancelled { uuid: order.uuid.into(), timestamp: now_ms() / 1000, pair_trie_root: H64::default(), }); - delete_my_order(&ctx, order.uuid).await; + delete_my_order(&ctx, order.uuid); log::debug!("maker_order_cancelled_p2p_notify called, message {:?}", message); broadcast_ordermatch_message(&ctx, vec![order.orderbook_topic()], message); } @@ -960,7 +953,7 @@ impl BalanceTradeFeeUpdatedHandler for BalanceUpdateOrdermatchHandler { // This checks that the order hasn't been removed by another process if removed_order_mutex.is_some() { // cancel the order - maker_order_cancelled_p2p_notify(ctx.clone(), &order).await; + maker_order_cancelled_p2p_notify(ctx.clone(), &order); delete_my_maker_order( ctx.clone(), order.clone(), @@ -977,7 +970,7 @@ impl BalanceTradeFeeUpdatedHandler for BalanceUpdateOrdermatchHandler { let mut update_msg = new_protocol::MakerOrderUpdated::new(order.uuid); update_msg.with_new_max_volume(new_volume.to_ratio()); order.apply_updated(&update_msg); - maker_order_updated_p2p_notify(ctx.clone(), order.orderbook_topic(), update_msg).await; + maker_order_updated_p2p_notify(ctx.clone(), order.orderbook_topic(), update_msg); } } } @@ -2082,7 +2075,7 @@ pub async fn broadcast_maker_orders_keep_alive_loop(ctx: MmArc) { while !ctx.is_stopping() { Timer::sleep(MIN_ORDER_KEEP_ALIVE_INTERVAL as f64).await; let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).expect("from_ctx failed"); - let orderbook = ordermatch_ctx.orderbook.lock().await; + let orderbook = ordermatch_ctx.orderbook.lock(); let state = match orderbook.pubkeys_state.get(&my_pubsecp) { Some(s) => s, None => continue, @@ -2510,7 +2503,7 @@ impl Orderbook { struct OrdermatchContext { pub my_maker_orders: PaMutex>>>, pub my_taker_orders: AsyncMutex>, - pub orderbook: AsyncMutex, + pub orderbook: PaMutex, /// The map from coin original ticker to the orderbook ticker /// It is used to share the same orderbooks for concurrently activated coins with different protocols /// E.g. BTC and BTC-Segwit @@ -2801,7 +2794,7 @@ pub async fn lp_ordermatch_loop(ctx: MmArc) { { // remove "timed out" pubkeys states with their orders from orderbook - let mut orderbook = ordermatch_ctx.orderbook.lock().await; + let mut orderbook = ordermatch_ctx.orderbook.lock(); let mut uuids_to_remove = vec![]; let mut pubkeys_to_remove = vec![]; for (pubkey, state) in orderbook.pubkeys_state.iter() { @@ -2825,50 +2818,53 @@ pub async fn lp_ordermatch_loop(ctx: MmArc) { } { - let my_maker_orders = ordermatch_ctx.my_maker_orders.lock().clone(); + let mut missing_uuids = Vec::new(); + { + let orderbook = ordermatch_ctx.orderbook.lock(); + for (uuid, _) in ordermatch_ctx.my_maker_orders.lock().iter() { + if !orderbook.order_set.contains_key(uuid) { + missing_uuids.push(*uuid); + } + } + } - for (uuid, order_mutex) in my_maker_orders.iter() { - let order = order_mutex.lock().await; - if !ordermatch_ctx.orderbook.lock().await.order_set.contains_key(uuid) { - if let Ok(Some((base, rel))) = find_pair(&ctx, &order.base, &order.rel).await { - if let Err(e) = order.check_balance(&ctx, &base, &rel).await { - log::info!("Error {} on balance check to kickstart order {}, cancelling", e, uuid); - let removed_order_mutex = ordermatch_ctx.my_maker_orders.lock().remove(uuid); - // This checks that the order hasn't been removed by another process - if removed_order_mutex.is_some() { - delete_my_maker_order( - ctx.clone(), - order.clone(), - MakerOrderCancellationReason::InsufficientBalance, - ) - .compat() - .await - .ok(); - } - continue; - } + for uuid in missing_uuids { + let order_mutex = match ordermatch_ctx.my_maker_orders.lock().get(&uuid) { + Some(o) => o.clone(), + None => continue, + }; - let topic = order.orderbook_topic(); - if !ordermatch_ctx.orderbook.lock().await.is_subscribed_to(&topic) { - let request_orderbook = false; - if let Err(e) = subscribe_to_orderbook_topic( - &ctx, - &order.base_orderbook_ticker(), - &order.rel_orderbook_ticker(), - request_orderbook, + let order = order_mutex.lock().await; + if let Ok(Some((base, rel))) = find_pair(&ctx, &order.base, &order.rel).await { + if let Err(e) = order.check_balance(&ctx, &base, &rel).await { + log::info!("Error {} on balance check to kickstart order {}, cancelling", e, uuid); + let removed_order_mutex = ordermatch_ctx.my_maker_orders.lock().remove(&uuid); + // This checks that the order hasn't been removed by another process + if removed_order_mutex.is_some() { + delete_my_maker_order( + ctx.clone(), + order.clone(), + MakerOrderCancellationReason::InsufficientBalance, ) + .compat() .await - { - log::error!("Error {} on subscribing to orderbook topic {}", e, topic); - } + .ok(); } + continue; + } + + let maker_orders = ordermatch_ctx.my_maker_orders.lock(); + + // notify other nodes only if maker order is still there keeping maker_orders locked during the operation + if maker_orders.contains_key(&uuid) { + let topic = order.orderbook_topic(); + subscribe_to_topic(&ctx, topic); maker_order_created_p2p_notify( ctx.clone(), &order, base.coin_protocol_info(), rel.coin_protocol_info(), - ) - .await; + ); } } } @@ -2890,7 +2886,7 @@ pub async fn clean_memory_loop(ctx_weak: MmWeak) { } let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); - let mut orderbook = ordermatch_ctx.orderbook.lock().await; + let mut orderbook = ordermatch_ctx.orderbook.lock(); orderbook.memory_db.purge(); } Timer::sleep(600.).await; @@ -2952,8 +2948,7 @@ async fn handle_timed_out_taker_orders(ctx: MmArc, ordermatch_ctx: &OrdermatchCo &maker_order, base.coin_protocol_info(), rel.coin_protocol_info(), - ) - .await; + ); } } @@ -2980,7 +2975,7 @@ async fn check_balance_for_maker_orders(ctx: MmArc, ordermatch_ctx: &OrdermatchC let removed_order_mutex = ordermatch_ctx.my_maker_orders.lock().remove(&uuid); // This checks that the order hasn't been removed by another process if removed_order_mutex.is_some() { - maker_order_cancelled_p2p_notify(ctx.clone(), &order).await; + maker_order_cancelled_p2p_notify(ctx.clone(), &order); delete_my_maker_order(ctx.clone(), order.clone(), reason) .compat() .await @@ -3258,7 +3253,7 @@ async fn process_taker_connect(ctx: MmArc, sender_pubkey: H256Json, connect_msg: if my_order.available_amount() >= my_order.min_base_vol { let mut updated_msg = new_protocol::MakerOrderUpdated::new(my_order.uuid); updated_msg.with_new_max_volume(my_order.available_amount().into()); - maker_order_updated_p2p_notify(ctx.clone(), topic, updated_msg).await; + maker_order_updated_p2p_notify(ctx.clone(), topic, updated_msg); } MyOrdersStorage::new(ctx) .update_active_maker_order(&my_order) @@ -4090,8 +4085,7 @@ pub async fn create_maker_order(ctx: &MmArc, req: SetPriceReq) -> Result Result return ERR!("Error on saving updated order state to database:{}", e); } update_msg.with_new_max_volume((new_volume - reserved_amount).into()); - maker_order_updated_p2p_notify(ctx.clone(), order.orderbook_topic(), update_msg).await; + maker_order_updated_p2p_notify(ctx.clone(), order.orderbook_topic(), update_msg); Ok(order.clone()) } @@ -4531,7 +4525,7 @@ pub async fn cancel_order(ctx: MmArc, req: CancelOrderReq) -> Result Result> let removed_order_mutex = ordermatch_ctx.my_maker_orders.lock().remove(&req.uuid); // This checks that the order hasn't been removed by another process if removed_order_mutex.is_some() { - maker_order_cancelled_p2p_notify(ctx.clone(), &order).await; + maker_order_cancelled_p2p_notify(ctx.clone(), &order); delete_my_maker_order(ctx, order.clone(), MakerOrderCancellationReason::Cancelled) .compat() .await @@ -4892,7 +4886,7 @@ pub async fn cancel_orders_by(ctx: &MmArc, cancel_by: CancelBy) -> Result<(Vec { @@ -4961,7 +4955,7 @@ pub(self) async fn subscribe_to_orderbook_topic( e.insert(OrderbookRequestingState::NotRequested { subscribed_at: current_timestamp, }); - subscribe_to_topic(ctx, topic.clone()).await; + subscribe_to_topic(ctx, topic.clone()); // orderbook is not filled false }, diff --git a/mm2src/lp_ordermatch/best_orders.rs b/mm2src/lp_ordermatch/best_orders.rs index 4bd07002cb..fd5e6e5923 100644 --- a/mm2src/lp_ordermatch/best_orders.rs +++ b/mm2src/lp_ordermatch/best_orders.rs @@ -33,14 +33,14 @@ struct BestOrdersRes { protocol_infos: HashMap, } -pub async fn process_best_orders_p2p_request( +pub fn process_best_orders_p2p_request( ctx: MmArc, coin: String, action: BestOrdersAction, required_volume: BigRational, ) -> Result>, String> { let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).expect("ordermatch_ctx must exist at this point"); - let orderbook = ordermatch_ctx.orderbook.lock().await; + let orderbook = ordermatch_ctx.orderbook.lock(); let search_pairs_in = match action { BestOrdersAction::Buy => &orderbook.pairs_existing_for_base, BestOrdersAction::Sell => &orderbook.pairs_existing_for_rel, diff --git a/mm2src/lp_ordermatch/orderbook_depth.rs b/mm2src/lp_ordermatch/orderbook_depth.rs index 7c532b473e..1a5e8dc800 100644 --- a/mm2src/lp_ordermatch/orderbook_depth.rs +++ b/mm2src/lp_ordermatch/orderbook_depth.rs @@ -44,36 +44,34 @@ pub async fn orderbook_depth_rpc(ctx: MmArc, req: Json) -> Result = req - .pairs - .into_iter() - .filter_map(|original_pair| { - let orderbook_pair = ordermatch_ctx.orderbook_pair_bypass(&original_pair); - let topic = orderbook_topic_from_base_rel(&orderbook_pair.0, &orderbook_pair.1); - if orderbook.is_subscribed_to(&topic) { - let asks = orderbook - .unordered - .get(&orderbook_pair) - .map_or(0, |orders| orders.len()); - let reversed = (orderbook_pair.1, orderbook_pair.0); - let bids = orderbook.unordered.get(&reversed).map_or(0, |orders| orders.len()); - result.push(PairWithDepth { - pair: original_pair, - depth: PairDepth { asks, bids }, - }); - None - } else { - Some((orderbook_pair, original_pair)) - } - }) - .collect(); + let mut to_request_from_relay: Vec<_> = { + let orderbook = ordermatch_ctx.orderbook.lock(); + req.pairs + .into_iter() + .filter_map(|original_pair| { + let orderbook_pair = ordermatch_ctx.orderbook_pair_bypass(&original_pair); + let topic = orderbook_topic_from_base_rel(&orderbook_pair.0, &orderbook_pair.1); + if orderbook.is_subscribed_to(&topic) { + let asks = orderbook + .unordered + .get(&orderbook_pair) + .map_or(0, |orders| orders.len()); + let reversed = (orderbook_pair.1, orderbook_pair.0); + let bids = orderbook.unordered.get(&reversed).map_or(0, |orders| orders.len()); + result.push(PairWithDepth { + pair: original_pair, + depth: PairDepth { asks, bids }, + }); + None + } else { + Some((orderbook_pair, original_pair)) + } + }) + .collect() + }; - // avoid locking orderbook for long time during P2P request - drop(orderbook); if !to_request_from_relay.is_empty() { let p2p_request = OrdermatchRequest::OrderbookDepth { pairs: to_request_from_relay @@ -113,12 +111,12 @@ pub async fn orderbook_depth_rpc(ctx: MmArc, req: Json) -> Result, ) -> Result>, String> { let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).expect("ordermatch_ctx must exist at this point"); - let orderbook = ordermatch_ctx.orderbook.lock().await; + let orderbook = ordermatch_ctx.orderbook.lock(); let depth = pairs .into_iter() .map(|pair| { diff --git a/mm2src/lp_ordermatch/orderbook_rpc.rs b/mm2src/lp_ordermatch/orderbook_rpc.rs index de76c8146a..20889f13b1 100644 --- a/mm2src/lp_ordermatch/orderbook_rpc.rs +++ b/mm2src/lp_ordermatch/orderbook_rpc.rs @@ -103,7 +103,7 @@ pub async fn orderbook_rpc(ctx: MmArc, req: Json) -> Result>, S } try_s!(subscribe_to_orderbook_topic(&ctx, &base_ticker, &rel_ticker, request_orderbook).await); - let orderbook = ordermatch_ctx.orderbook.lock().await; + let orderbook = ordermatch_ctx.orderbook.lock(); let my_pubsecp = try_s!(CryptoCtx::from_ctx(&ctx)).secp256k1_pubkey_hex(); let mut asks = match orderbook.unordered.get(&(base_ticker.clone(), rel_ticker.clone())) { diff --git a/mm2src/lp_stats.rs b/mm2src/lp_stats.rs index f3051cd07e..d544e5ba88 100644 --- a/mm2src/lp_stats.rs +++ b/mm2src/lp_stats.rs @@ -175,16 +175,16 @@ pub enum NetworkInfoRequest { GetMm2Version, } -async fn process_get_version_request(ctx: MmArc) -> Result>, String> { +fn process_get_version_request(ctx: MmArc) -> Result>, String> { let response = ctx.mm_version().to_string(); let encoded = try_s!(encode_message(&response)); Ok(Some(encoded)) } -pub async fn process_info_request(ctx: MmArc, request: NetworkInfoRequest) -> Result>, String> { +pub fn process_info_request(ctx: MmArc, request: NetworkInfoRequest) -> Result>, String> { log::debug!("Got stats request {:?}", request); match request { - NetworkInfoRequest::GetMm2Version => process_get_version_request(ctx).await, + NetworkInfoRequest::GetMm2Version => process_get_version_request(ctx), } } diff --git a/mm2src/lp_swap/maker_swap.rs b/mm2src/lp_swap/maker_swap.rs index 496a4ccbc5..ae1b616b4c 100644 --- a/mm2src/lp_swap/maker_swap.rs +++ b/mm2src/lp_swap/maker_swap.rs @@ -1539,7 +1539,7 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) { ); let ctx = swap.ctx.clone(); - subscribe_to_topic(&ctx, swap_topic(&swap.uuid)).await; + subscribe_to_topic(&ctx, swap_topic(&swap.uuid)); let mut status = ctx.log.status_handle(); let uuid_str = swap.uuid.to_string(); macro_rules! swap_tags { diff --git a/mm2src/lp_swap/taker_swap.rs b/mm2src/lp_swap/taker_swap.rs index a5479e29d4..359953723c 100644 --- a/mm2src/lp_swap/taker_swap.rs +++ b/mm2src/lp_swap/taker_swap.rs @@ -333,7 +333,7 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) { ); let ctx = swap.ctx.clone(); - subscribe_to_topic(&ctx, swap_topic(&swap.uuid)).await; + subscribe_to_topic(&ctx, swap_topic(&swap.uuid)); let mut status = ctx.log.status_handle(); let uuid = swap.uuid.to_string(); let running_swap = Arc::new(swap); diff --git a/mm2src/ordermatch_tests.rs b/mm2src/ordermatch_tests.rs index 5f8be87afb..dc6f803b7a 100644 --- a/mm2src/ordermatch_tests.rs +++ b/mm2src/ordermatch_tests.rs @@ -1644,7 +1644,7 @@ fn p2p_context_mock() -> (mpsc::Sender, mpsc::Receiver(&encoded).unwrap(); for (pubkey, item) in orderbook.pubkey_orders { @@ -1728,7 +1724,7 @@ fn test_process_get_orderbook_request_limit() { let ordermatch_ctx_clone = ordermatch_ctx.clone(); OrdermatchContext::from_ctx.mock_safe(move |_| MockResult::Return(Ok(ordermatch_ctx_clone.clone()))); - let mut orderbook = block_on(ordermatch_ctx.orderbook.lock()); + let mut orderbook = ordermatch_ctx.orderbook.lock(); let orders = make_random_orders( pubkey, @@ -1745,13 +1741,9 @@ fn test_process_get_orderbook_request_limit() { // avoid dead lock on orderbook as process_get_orderbook_request also acquires it drop(orderbook); - let err = block_on(process_get_orderbook_request( - ctx.clone(), - "RICK".into(), - "MORTY".into(), - )) - .err() - .expect("Expected an error"); + let err = process_get_orderbook_request(ctx.clone(), "RICK".into(), "MORTY".into()) + .err() + .expect("Expected an error"); log!("error: "(err)); assert!(err.contains("Orderbook too large")); @@ -1787,7 +1779,7 @@ fn test_request_and_fill_orderbook() { { let (pubkey, secret) = &other_pubkeys[0]; for extra_order in make_random_orders(pubkey.clone(), secret, "RICK".into(), "MORTY".into(), 2) { - block_on(insert_or_update_order(&ctx, extra_order)); + insert_or_update_order(&ctx, extra_order); } } @@ -1834,7 +1826,7 @@ fn test_request_and_fill_orderbook() { // check if the best asks and bids are in the orderbook let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); - let orderbook = block_on(ordermatch_ctx.orderbook.lock()); + let orderbook = ordermatch_ctx.orderbook.lock(); let expected = expected_orders .iter() @@ -2216,7 +2208,7 @@ fn test_orderbook_insert_or_update_order() { fn pair_trie_root_by_pub(ctx: &MmArc, pubkey: &str, pair: &str) -> H64 { let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).unwrap(); - let orderbook = block_on(ordermatch_ctx.orderbook.lock()); + let orderbook = ordermatch_ctx.orderbook.lock(); *orderbook .pubkeys_state .get(pubkey) @@ -2228,13 +2220,13 @@ fn pair_trie_root_by_pub(ctx: &MmArc, pubkey: &str, pair: &str) -> H64 { fn clone_orderbook_memory_db(ctx: &MmArc) -> MemoryDB { let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).unwrap(); - let orderbook = block_on(ordermatch_ctx.orderbook.lock()); + let orderbook = ordermatch_ctx.orderbook.lock(); orderbook.memory_db.clone() } fn remove_order(ctx: &MmArc, uuid: Uuid) { let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).unwrap(); - let mut orderbook = block_on(ordermatch_ctx.orderbook.lock()); + let mut orderbook = ordermatch_ctx.orderbook.lock(); orderbook.remove_order_trie_update(uuid); } @@ -2244,7 +2236,7 @@ fn test_process_sync_pubkey_orderbook_state_after_new_orders_added() { let orders = make_random_orders(pubkey.clone(), &secret, "C1".into(), "C2".into(), 100); for order in orders { - block_on(insert_or_update_order(&ctx, order)); + insert_or_update_order(&ctx, order); } let alb_ordered_pair = alb_ordered_pair("C1", "C2"); @@ -2256,16 +2248,12 @@ fn test_process_sync_pubkey_orderbook_state_after_new_orders_added() { let new_orders = make_random_orders(pubkey.clone(), &secret, "C1".into(), "C2".into(), 100); for order in new_orders { - block_on(insert_or_update_order(&ctx, order.clone())); + insert_or_update_order(&ctx, order.clone()); } - let mut result = block_on(process_sync_pubkey_orderbook_state( - ctx.clone(), - pubkey.clone(), - prev_pairs_state, - )) - .unwrap() - .unwrap(); + let mut result = process_sync_pubkey_orderbook_state(ctx.clone(), pubkey.clone(), prev_pairs_state) + .unwrap() + .unwrap(); // check pair trie root let expected_root_hash = pair_trie_root_by_pub(&ctx, &pubkey, &alb_ordered_pair); @@ -2298,17 +2286,17 @@ fn test_diff_should_not_be_written_if_hash_not_changed_on_insert() { let orders = make_random_orders(pubkey.clone(), &secret, "C1".into(), "C2".into(), 100); for order in orders.clone() { - block_on(insert_or_update_order(&ctx, order)); + insert_or_update_order(&ctx, order); } let alb_ordered_pair = alb_ordered_pair("C1", "C2"); let pair_trie_root = pair_trie_root_by_pub(&ctx, &pubkey, &alb_ordered_pair); for order in orders.clone() { - block_on(insert_or_update_order(&ctx, order)); + insert_or_update_order(&ctx, order); } let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); - let orderbook = block_on(ordermatch_ctx.orderbook.lock()); + let orderbook = ordermatch_ctx.orderbook.lock(); let pubkey_state = orderbook.pubkeys_state.get(&pubkey).unwrap(); assert!(!pubkey_state .order_pairs_trie_state_history @@ -2323,7 +2311,7 @@ fn test_process_sync_pubkey_orderbook_state_after_orders_removed() { let orders = make_random_orders(pubkey.clone(), &secret, "C1".into(), "C2".into(), 100); for order in orders.clone() { - block_on(insert_or_update_order(&ctx, order)); + insert_or_update_order(&ctx, order); } let alb_ordered_pair = alb_ordered_pair("C1", "C2"); @@ -2340,13 +2328,9 @@ fn test_process_sync_pubkey_orderbook_state_after_orders_removed() { remove_order(&ctx, order.uuid); } - let mut result = block_on(process_sync_pubkey_orderbook_state( - ctx.clone(), - pubkey.clone(), - prev_pairs_state, - )) - .unwrap() - .unwrap(); + let mut result = process_sync_pubkey_orderbook_state(ctx.clone(), pubkey.clone(), prev_pairs_state) + .unwrap() + .unwrap(); // check pair trie root let expected_root_hash = pair_trie_root_by_pub(&ctx, &pubkey, &alb_ordered_pair); @@ -2373,7 +2357,7 @@ fn test_diff_should_not_be_written_if_hash_not_changed_on_remove() { let orders = make_random_orders(pubkey.clone(), &secret, "C1".into(), "C2".into(), 100); for order in orders.clone() { - block_on(insert_or_update_order(&ctx, order)); + insert_or_update_order(&ctx, order); } let to_remove: Vec<_> = orders @@ -2391,7 +2375,7 @@ fn test_diff_should_not_be_written_if_hash_not_changed_on_remove() { let pair_trie_root = pair_trie_root_by_pub(&ctx, &pubkey, &alb_ordered_pair); let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); - let orderbook = block_on(ordermatch_ctx.orderbook.lock()); + let orderbook = ordermatch_ctx.orderbook.lock(); let pubkey_state = orderbook.pubkeys_state.get(&pubkey).unwrap(); assert!(!pubkey_state .order_pairs_trie_state_history @@ -2505,7 +2489,7 @@ fn test_process_sync_pubkey_orderbook_state_points_to_not_uptodate_trie_root() { .expect("Expected one order"); for order in orders.iter() { - block_on(insert_or_update_order(&ctx, order.clone())); + insert_or_update_order(&ctx, order.clone()); } let alb_pair = alb_ordered_pair("RICK", "MORTY"); @@ -2513,7 +2497,7 @@ fn test_process_sync_pubkey_orderbook_state_points_to_not_uptodate_trie_root() { // update trie root by adding a new order and do not update history let (old_root, _new_root) = { let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); - let mut orderbook = block_on(ordermatch_ctx.orderbook.lock()); + let mut orderbook = ordermatch_ctx.orderbook.lock(); log!([pubkey]", found "[orderbook.pubkeys_state.keys()]); let old_root = *orderbook @@ -2548,7 +2532,7 @@ fn test_process_sync_pubkey_orderbook_state_points_to_not_uptodate_trie_root() { let SyncPubkeyOrderbookStateRes { mut pair_orders_diff, .. - } = block_on(process_sync_pubkey_orderbook_state(ctx, pubkey, roots)) + } = process_sync_pubkey_orderbook_state(ctx, pubkey, roots) .expect("!process_sync_pubkey_orderbook_state") .expect("Expected MORTY:RICK delta, returned None"); @@ -2652,13 +2636,13 @@ fn test_remove_and_purge_pubkey_pair_orders() { let rick_kmd_orders = make_random_orders(pubkey.clone(), &secret, "RICK".into(), "KMD".into(), 10); for order in rick_morty_orders.iter().chain(rick_kmd_orders.iter()) { - block_on(insert_or_update_order(&ctx, order.clone())); + insert_or_update_order(&ctx, order.clone()); } let rick_morty_pair = alb_ordered_pair("RICK", "MORTY"); let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); - let mut orderbook = block_on(ordermatch_ctx.orderbook.lock()); + let mut orderbook = ordermatch_ctx.orderbook.lock(); remove_pubkey_pair_orders(&mut orderbook, &pubkey, &rick_morty_pair); check_if_orderbook_contains_only(&orderbook, &pubkey, &rick_kmd_orders); @@ -2672,17 +2656,17 @@ fn test_orderbook_sync_trie_diff_time_cache() { let rick_morty_pair = alb_ordered_pair("RICK", "MORTY"); for order in &rick_morty_orders[..5] { - block_on(insert_or_update_order(&ctx_bob, order.clone())); + insert_or_update_order(&ctx_bob, order.clone()); } std::thread::sleep(Duration::from_secs(3)); for order in &rick_morty_orders[5..10] { - block_on(insert_or_update_order(&ctx_bob, order.clone())); + insert_or_update_order(&ctx_bob, order.clone()); } let ordermatch_ctx_bob = OrdermatchContext::from_ctx(&ctx_bob).unwrap(); - let orderbook_bob = block_on(ordermatch_ctx_bob.orderbook.lock()); + let orderbook_bob = ordermatch_ctx_bob.orderbook.lock(); let bob_state = orderbook_bob.pubkeys_state.get(&pubkey_bob).unwrap(); let rick_morty_history_bob = bob_state.order_pairs_trie_state_history.get(&rick_morty_pair).unwrap(); assert_eq!(rick_morty_history_bob.len(), 5); @@ -2691,11 +2675,11 @@ fn test_orderbook_sync_trie_diff_time_cache() { let (ctx_alice, ..) = make_ctx_for_tests(); for order in &rick_morty_orders[..3] { - block_on(insert_or_update_order(&ctx_alice, order.clone())); + insert_or_update_order(&ctx_alice, order.clone()); } let ordermatch_ctx_alice = OrdermatchContext::from_ctx(&ctx_alice).unwrap(); - let mut orderbook_alice = block_on(ordermatch_ctx_alice.orderbook.lock()); + let mut orderbook_alice = ordermatch_ctx_alice.orderbook.lock(); let bob_state_on_alice_side = orderbook_alice.pubkeys_state.get(&pubkey_bob).unwrap(); let alice_root = bob_state_on_alice_side.trie_roots.get(&rick_morty_pair).unwrap(); @@ -2732,17 +2716,17 @@ fn test_orderbook_sync_trie_diff_time_cache() { drop(orderbook_alice); for order in &rick_morty_orders[10..] { - block_on(insert_or_update_order(&ctx_bob, order.clone())); + insert_or_update_order(&ctx_bob, order.clone()); } - let mut orderbook_bob = block_on(ordermatch_ctx_bob.orderbook.lock()); + let mut orderbook_bob = ordermatch_ctx_bob.orderbook.lock(); orderbook_bob.remove_order_trie_update(rick_morty_orders[12].uuid); let bob_state = orderbook_bob.pubkeys_state.get(&pubkey_bob).unwrap(); let rick_morty_history_bob = bob_state.order_pairs_trie_state_history.get(&rick_morty_pair).unwrap(); - let mut orderbook_alice = block_on(ordermatch_ctx_alice.orderbook.lock()); + let mut orderbook_alice = ordermatch_ctx_alice.orderbook.lock(); let bob_state_on_alice_side = orderbook_alice.pubkeys_state.get(&pubkey_bob).unwrap(); let alice_root = bob_state_on_alice_side.trie_roots.get(&rick_morty_pair).unwrap(); @@ -2784,7 +2768,7 @@ fn test_orderbook_order_pairs_trie_state_history_updates_expiration_on_insert() let rick_morty_pair = alb_ordered_pair("RICK", "MORTY"); for order in &rick_morty_orders[..5] { - block_on(insert_or_update_order(&ctx_bob, order.clone())); + insert_or_update_order(&ctx_bob, order.clone()); } // After 3 seconds RICK:MORTY pair trie state history will time out and will be empty @@ -2792,11 +2776,11 @@ fn test_orderbook_order_pairs_trie_state_history_updates_expiration_on_insert() // Insert some more orders to remove expired timecache RICK:MORTY key for order in &rick_morty_orders[5..10] { - block_on(insert_or_update_order(&ctx_bob, order.clone())); + insert_or_update_order(&ctx_bob, order.clone()); } let ordermatch_ctx_bob = OrdermatchContext::from_ctx(&ctx_bob).unwrap(); - let orderbook_bob = block_on(ordermatch_ctx_bob.orderbook.lock()); + let orderbook_bob = ordermatch_ctx_bob.orderbook.lock(); let bob_state = orderbook_bob.pubkeys_state.get(&pubkey_bob).unwrap(); // Only the last inserted 5 orders are found @@ -2815,11 +2799,11 @@ fn test_orderbook_order_pairs_trie_state_history_updates_expiration_on_insert() // On inserting 5 more orders expiration for RICK:MORTY pair trie state history will be reset for order in &rick_morty_orders[10..] { - block_on(insert_or_update_order(&ctx_bob, order.clone())); + insert_or_update_order(&ctx_bob, order.clone()); } let ordermatch_ctx_bob = OrdermatchContext::from_ctx(&ctx_bob).unwrap(); - let orderbook_bob = block_on(ordermatch_ctx_bob.orderbook.lock()); + let orderbook_bob = ordermatch_ctx_bob.orderbook.lock(); let bob_state = orderbook_bob.pubkeys_state.get(&pubkey_bob).unwrap(); assert_eq!( @@ -2836,7 +2820,7 @@ fn test_orderbook_order_pairs_trie_state_history_updates_expiration_on_insert() std::thread::sleep(Duration::from_secs(1)); let ordermatch_ctx_bob = OrdermatchContext::from_ctx(&ctx_bob).unwrap(); - let orderbook_bob = block_on(ordermatch_ctx_bob.orderbook.lock()); + let orderbook_bob = ordermatch_ctx_bob.orderbook.lock(); let bob_state = orderbook_bob.pubkeys_state.get(&pubkey_bob).unwrap(); // After 3 seconds from inserting orders number 6-10 these orders have not expired due to updated expiration on inserting orders 11-15 diff --git a/mm2src/rpc/lp_commands.rs b/mm2src/rpc/lp_commands.rs index 9fb988d4c4..781dc65a87 100644 --- a/mm2src/rpc/lp_commands.rs +++ b/mm2src/rpc/lp_commands.rs @@ -253,7 +253,7 @@ pub async fn get_peers_info(ctx: MmArc) -> Result>, String> { use crate::mm2::lp_network::P2PContext; use mm2_libp2p::atomicdex_behaviour::get_peers_info; let ctx = P2PContext::fetch_from_mm_arc(&ctx); - let cmd_tx = ctx.cmd_tx.lock().await.clone(); + let cmd_tx = ctx.cmd_tx.lock().clone(); let result = get_peers_info(cmd_tx).await; let result = json!({ "result": result, @@ -266,7 +266,7 @@ pub async fn get_gossip_mesh(ctx: MmArc) -> Result>, String> { use crate::mm2::lp_network::P2PContext; use mm2_libp2p::atomicdex_behaviour::get_gossip_mesh; let ctx = P2PContext::fetch_from_mm_arc(&ctx); - let cmd_tx = ctx.cmd_tx.lock().await.clone(); + let cmd_tx = ctx.cmd_tx.lock().clone(); let result = get_gossip_mesh(cmd_tx).await; let result = json!({ "result": result, @@ -279,7 +279,7 @@ pub async fn get_gossip_peer_topics(ctx: MmArc) -> Result>, Str use crate::mm2::lp_network::P2PContext; use mm2_libp2p::atomicdex_behaviour::get_gossip_peer_topics; let ctx = P2PContext::fetch_from_mm_arc(&ctx); - let cmd_tx = ctx.cmd_tx.lock().await.clone(); + let cmd_tx = ctx.cmd_tx.lock().clone(); let result = get_gossip_peer_topics(cmd_tx).await; let result = json!({ "result": result, @@ -292,7 +292,7 @@ pub async fn get_gossip_topic_peers(ctx: MmArc) -> Result>, Str use crate::mm2::lp_network::P2PContext; use mm2_libp2p::atomicdex_behaviour::get_gossip_topic_peers; let ctx = P2PContext::fetch_from_mm_arc(&ctx); - let cmd_tx = ctx.cmd_tx.lock().await.clone(); + let cmd_tx = ctx.cmd_tx.lock().clone(); let result = get_gossip_topic_peers(cmd_tx).await; let result = json!({ "result": result, @@ -305,7 +305,7 @@ pub async fn get_relay_mesh(ctx: MmArc) -> Result>, String> { use crate::mm2::lp_network::P2PContext; use mm2_libp2p::atomicdex_behaviour::get_relay_mesh; let ctx = P2PContext::fetch_from_mm_arc(&ctx); - let cmd_tx = ctx.cmd_tx.lock().await.clone(); + let cmd_tx = ctx.cmd_tx.lock().clone(); let result = get_relay_mesh(cmd_tx).await; let result = json!({ "result": result,