From 4343c0eaa90cede5fbee8ef27a01fea64eef72c2 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Wed, 30 Oct 2024 14:37:30 +0300 Subject: [PATCH 01/22] add time validation core logic Signed-off-by: onur-ozkan --- mm2src/mm2_main/src/lp_stats.rs | 7 +++ .../request_response/network_info.rs | 2 + mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 43 ++++++++++++++++++- 3 files changed, 51 insertions(+), 1 deletion(-) diff --git a/mm2src/mm2_main/src/lp_stats.rs b/mm2src/mm2_main/src/lp_stats.rs index 185996ecd1..e1ef7c844a 100644 --- a/mm2src/mm2_main/src/lp_stats.rs +++ b/mm2src/mm2_main/src/lp_stats.rs @@ -176,10 +176,17 @@ fn process_get_version_request(ctx: MmArc) -> Result>, String> { Ok(Some(encoded)) } +fn process_get_peer_utc_timestamp_request(_ctx: MmArc) -> Result>, String> { + let timestamp = common::get_utc_timestamp(); + let encoded = try_s!(encode_message(×tamp)); + Ok(Some(encoded)) +} + pub fn process_info_request(ctx: MmArc, request: NetworkInfoRequest) -> Result>, String> { log::debug!("Got stats request {:?}", request); match request { NetworkInfoRequest::GetMm2Version => process_get_version_request(ctx), + NetworkInfoRequest::GetPeerUtcTimestamp => process_get_peer_utc_timestamp_request(ctx), } } diff --git a/mm2src/mm2_p2p/src/application/request_response/network_info.rs b/mm2src/mm2_p2p/src/application/request_response/network_info.rs index c8dece2ef5..4d610d932c 100644 --- a/mm2src/mm2_p2p/src/application/request_response/network_info.rs +++ b/mm2src/mm2_p2p/src/application/request_response/network_info.rs @@ -6,4 +6,6 @@ use serde::{Deserialize, Serialize}; pub enum NetworkInfoRequest { /// Get MM2 version of nodes added to stats collection GetMm2Version, + /// Get UTC timestamp in seconds from the target peer + GetPeerUtcTimestamp, } diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index 9d58da4e1e..bb069cce13 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -1,5 +1,6 @@ use common::executor::SpawnFuture; use derive_more::Display; +use futures::channel::mpsc::UnboundedSender; use futures::channel::mpsc::{channel, Receiver, Sender}; use futures::{channel::oneshot, future::{join_all, poll_fn}, @@ -29,10 +30,12 @@ use super::peers_exchange::{PeerAddresses, PeersExchange, PeersExchangeRequest, use super::ping::AdexPing; use super::request_response::{build_request_response_behaviour, PeerRequest, PeerResponse, RequestResponseBehaviour, RequestResponseSender}; +use crate::application::request_response::network_info::NetworkInfoRequest; +use crate::application::request_response::P2PRequest; use crate::network::{get_all_network_seednodes, DEFAULT_NETID}; use crate::relay_address::{RelayAddress, RelayAddressError}; use crate::swarm_runtime::SwarmRuntime; -use crate::{NetworkInfo, NetworkPorts, RequestResponseBehaviourEvent}; +use crate::{decode_message, encode_message, NetworkInfo, NetworkPorts, RequestResponseBehaviourEvent}; pub use libp2p::gossipsub::{Behaviour as Gossipsub, IdentTopic, MessageAuthenticity, MessageId, Topic, TopicHash}; pub use libp2p::gossipsub::{ConfigBuilder as GossipsubConfigBuilder, Event as GossipsubEvent, @@ -199,6 +202,26 @@ pub async fn get_relay_mesh(mut cmd_tx: AdexCmdTx) -> Vec { rx.await.expect("Tx should be present") } +async fn validate_peer_time( + peer: PeerId, + mut response_tx: UnboundedSender>, + rp_sender: RequestResponseSender, +) { + let request = P2PRequest::NetworkInfo(NetworkInfoRequest::GetPeerUtcTimestamp); + let encoded_request = encode_message(&request) + .expect("Static type `NetworkInfoRequest::GetPeerUtcTimestamp` should never fail in serialization."); + + if let PeerResponse::Ok { res } = request_one_peer(peer, encoded_request, rp_sender).await { + if let Ok(_timestamp) = decode_message::(&res) { + // TODO: get current timestamp and compare it + todo!(); + }; + }; + + // Validation failed, send peer-id to disconnect from it. + response_tx.send(Some(peer)).await.unwrap(); +} + async fn request_one_peer(peer: PeerId, req: Vec, mut request_response_tx: RequestResponseSender) -> PeerResponse { // Use the internal receiver to receive a response to this request. let (internal_response_tx, internal_response_rx) = oneshot::channel(); @@ -724,6 +747,7 @@ fn start_gossipsub( let mut announce_interval = Ticker::new_with_next(ANNOUNCE_INTERVAL, ANNOUNCE_INITIAL_DELAY); let mut listening = false; + let (timestamp_tx, mut timestamp_rx) = futures::channel::mpsc::unbounded(); let polling_fut = poll_fn(move |cx: &mut Context| { loop { match swarm.behaviour_mut().cmd_rx.poll_next_unpin(cx) { @@ -733,11 +757,28 @@ fn start_gossipsub( } } + while let Poll::Ready(Some(Some(peer_id))) = timestamp_rx.poll_next_unpin(cx) { + println!("Peer '{}' has incorrect time, disconnecting from it.", peer_id); + swarm.disconnect_peer_id(peer_id).expect("TODO"); + let peer_list: Vec<_> = swarm.connected_peers().collect(); + dbg!(peer_list); + } + loop { match swarm.poll_next_unpin(cx) { Poll::Ready(Some(event)) => { debug!("Swarm event {:?}", event); + if let SwarmEvent::ConnectionEstablished { peer_id, .. } = &event { + println!("dbg: validating time"); + let future = validate_peer_time( + *peer_id, + timestamp_tx.clone(), + swarm.behaviour().core.request_response.sender(), + ); + swarm.behaviour().spawn(future); + } + if let SwarmEvent::Behaviour(event) = event { if swarm.behaviour_mut().netid != DEFAULT_NETID { if let AdexBehaviourEvent::Floodsub(FloodsubEvent::Message(message)) = &event { From 3b69689cd52443bd4cfb36fdeedf2220ee00bf23 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Thu, 31 Oct 2024 14:17:21 +0300 Subject: [PATCH 02/22] nit fixes Signed-off-by: onur-ozkan --- mm2src/mm2_main/src/lp_network.rs | 4 +++- mm2src/mm2_main/src/lp_ordermatch.rs | 1 - mm2src/mm2_main/src/lp_stats.rs | 15 ++++++--------- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/mm2src/mm2_main/src/lp_network.rs b/mm2src/mm2_main/src/lp_network.rs index 08ae5f8b3e..3fbd84f9d4 100644 --- a/mm2src/mm2_main/src/lp_network.rs +++ b/mm2src/mm2_main/src/lp_network.rs @@ -228,9 +228,11 @@ fn process_p2p_request( response_channel: mm2_libp2p::AdexResponseChannel, ) -> P2PRequestResult<()> { let request = decode_message::(&request)?; + log::debug!("Got P2PRequest {:?}", request); + let result = match request { P2PRequest::Ordermatch(req) => lp_ordermatch::process_peer_request(ctx.clone(), req), - P2PRequest::NetworkInfo(req) => lp_stats::process_info_request(ctx.clone(), req), + P2PRequest::NetworkInfo(req) => lp_stats::process_info_request(ctx.clone(), req).map(Some), }; let res = match result { diff --git a/mm2src/mm2_main/src/lp_ordermatch.rs b/mm2src/mm2_main/src/lp_ordermatch.rs index 620cb79bfb..dba2139998 100644 --- a/mm2src/mm2_main/src/lp_ordermatch.rs +++ b/mm2src/mm2_main/src/lp_ordermatch.rs @@ -638,7 +638,6 @@ impl TryFromBytes for Uuid { } pub fn process_peer_request(ctx: MmArc, request: OrdermatchRequest) -> Result>, String> { - log::debug!("Got ordermatch request {:?}", request); match request { OrdermatchRequest::GetOrderbook { base, rel } => process_get_orderbook_request(ctx, base, rel), OrdermatchRequest::SyncPubkeyOrderbookState { pubkey, trie_roots } => { diff --git a/mm2src/mm2_main/src/lp_stats.rs b/mm2src/mm2_main/src/lp_stats.rs index e1ef7c844a..8bea3ecb8c 100644 --- a/mm2src/mm2_main/src/lp_stats.rs +++ b/mm2src/mm2_main/src/lp_stats.rs @@ -170,23 +170,20 @@ struct Mm2VersionRes { nodes: HashMap, } -fn process_get_version_request(ctx: MmArc) -> Result>, String> { +fn process_get_version_request(ctx: MmArc) -> Result, String> { let response = ctx.mm_version().to_string(); - let encoded = try_s!(encode_message(&response)); - Ok(Some(encoded)) + encode_message(&response).map_err(|e| e.to_string()) } -fn process_get_peer_utc_timestamp_request(_ctx: MmArc) -> Result>, String> { +fn process_get_peer_utc_timestamp_request() -> Result, String> { let timestamp = common::get_utc_timestamp(); - let encoded = try_s!(encode_message(×tamp)); - Ok(Some(encoded)) + encode_message(×tamp).map_err(|e| e.to_string()) } -pub fn process_info_request(ctx: MmArc, request: NetworkInfoRequest) -> Result>, String> { - log::debug!("Got stats request {:?}", request); +pub fn process_info_request(ctx: MmArc, request: NetworkInfoRequest) -> Result, String> { match request { NetworkInfoRequest::GetMm2Version => process_get_version_request(ctx), - NetworkInfoRequest::GetPeerUtcTimestamp => process_get_peer_utc_timestamp_request(ctx), + NetworkInfoRequest::GetPeerUtcTimestamp => process_get_peer_utc_timestamp_request(), } } From 5acfe8d2c3996bf19662c77ed7418f952fc621e0 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Thu, 31 Oct 2024 14:30:02 +0300 Subject: [PATCH 03/22] handle time gap Signed-off-by: onur-ozkan --- mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 29 ++++++++++++++++------ 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index bb069cce13..b965b3d611 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -53,6 +53,10 @@ const ANNOUNCE_INTERVAL: Duration = Duration::from_secs(600); const ANNOUNCE_INITIAL_DELAY: Duration = Duration::from_secs(60); const CHANNEL_BUF_SIZE: usize = 1024 * 8; +/// Used in time validation logic for each peer which runs immediately after the +/// `ConnectionEstablished` event. +const MAX_TIME_GAP_FOR_CONNECTED_PEER: u64 = 20; + pub const DEPRECATED_NETID_LIST: &[u16] = &[ 7777, // TODO: keep it inaccessible until Q2 of 2024. ]; @@ -211,14 +215,25 @@ async fn validate_peer_time( let encoded_request = encode_message(&request) .expect("Static type `NetworkInfoRequest::GetPeerUtcTimestamp` should never fail in serialization."); - if let PeerResponse::Ok { res } = request_one_peer(peer, encoded_request, rp_sender).await { - if let Ok(_timestamp) = decode_message::(&res) { - // TODO: get current timestamp and compare it - todo!(); - }; - }; + match request_one_peer(peer, encoded_request, rp_sender).await { + PeerResponse::Ok { res } => { + if let Ok(timestamp) = decode_message::(&res) { + let now: u64 = common::get_utc_timestamp() + .try_into() + .expect("`common::get_utc_timestamp` returned invalid data."); + + // If time diff is in the acceptable gap, end the validation here. + if now.abs_diff(timestamp) <= MAX_TIME_GAP_FOR_CONNECTED_PEER { + response_tx.send(None).await.unwrap(); + return; + } + }; + }, + other => error!("Unexpected response `{other:?}` from peer `{peer}`"), + } - // Validation failed, send peer-id to disconnect from it. + // If the function reaches this point, this means validation has failed. + // Send the peer ID to disconnect from it. response_tx.send(Some(peer)).await.unwrap(); } From 82ae6d24e47d0c91a8ad6d6c43b9c63e66e6f37c Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Thu, 31 Oct 2024 14:38:32 +0300 Subject: [PATCH 04/22] improve logging Signed-off-by: onur-ozkan --- mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index b965b3d611..f6a821ba5c 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -773,10 +773,16 @@ fn start_gossipsub( } while let Poll::Ready(Some(Some(peer_id))) = timestamp_rx.poll_next_unpin(cx) { - println!("Peer '{}' has incorrect time, disconnecting from it.", peer_id); - swarm.disconnect_peer_id(peer_id).expect("TODO"); + println!("Peer `{peer_id}` is out of sync in time; disconnecting."); + + if swarm.disconnect_peer_id(peer_id).is_err() { + error!("Disconnection from `{peer_id}` failed unexpectedly, which should never happen."); + } + let peer_list: Vec<_> = swarm.connected_peers().collect(); + println!("????????????????"); dbg!(peer_list); + println!("!!!!!!!!!!!!!!!!"); } loop { @@ -785,7 +791,7 @@ fn start_gossipsub( debug!("Swarm event {:?}", event); if let SwarmEvent::ConnectionEstablished { peer_id, .. } = &event { - println!("dbg: validating time"); + info!("Validating time data for peer `{peer_id}`."); let future = validate_peer_time( *peer_id, timestamp_tx.clone(), From 5771790ad83617ae6199ac26b84ac3437d931a34 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Thu, 31 Oct 2024 14:51:24 +0300 Subject: [PATCH 05/22] add more trackable processing logs Signed-off-by: onur-ozkan --- mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index f6a821ba5c..ab96446e72 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -222,8 +222,14 @@ async fn validate_peer_time( .try_into() .expect("`common::get_utc_timestamp` returned invalid data."); + let diff = now.abs_diff(timestamp); + + info!( + "Peer '{peer}' is within the acceptable time gap (20 seconds); time difference is {diff} seconds." + ); + // If time diff is in the acceptable gap, end the validation here. - if now.abs_diff(timestamp) <= MAX_TIME_GAP_FOR_CONNECTED_PEER { + if diff <= MAX_TIME_GAP_FOR_CONNECTED_PEER { response_tx.send(None).await.unwrap(); return; } @@ -234,6 +240,7 @@ async fn validate_peer_time( // If the function reaches this point, this means validation has failed. // Send the peer ID to disconnect from it. + eprintln!("Peer `{peer}` is out of sync in time; disconnecting."); response_tx.send(Some(peer)).await.unwrap(); } @@ -773,8 +780,6 @@ fn start_gossipsub( } while let Poll::Ready(Some(Some(peer_id))) = timestamp_rx.poll_next_unpin(cx) { - println!("Peer `{peer_id}` is out of sync in time; disconnecting."); - if swarm.disconnect_peer_id(peer_id).is_err() { error!("Disconnection from `{peer_id}` failed unexpectedly, which should never happen."); } From f42e7064aa19661007643217fd6b54d08f289beb Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Thu, 31 Oct 2024 14:54:42 +0300 Subject: [PATCH 06/22] improve info log and remove debugging leftover Signed-off-by: onur-ozkan --- mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index ab96446e72..9f7f1888bd 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -55,7 +55,7 @@ const CHANNEL_BUF_SIZE: usize = 1024 * 8; /// Used in time validation logic for each peer which runs immediately after the /// `ConnectionEstablished` event. -const MAX_TIME_GAP_FOR_CONNECTED_PEER: u64 = 20; +const MAX_TIME_GAP_FOR_CONNECTED_PEER: u64 = 30; pub const DEPRECATED_NETID_LIST: &[u16] = &[ 7777, // TODO: keep it inaccessible until Q2 of 2024. @@ -225,7 +225,7 @@ async fn validate_peer_time( let diff = now.abs_diff(timestamp); info!( - "Peer '{peer}' is within the acceptable time gap (20 seconds); time difference is {diff} seconds." + "Peer '{peer}' is within the acceptable time gap ({MAX_TIME_GAP_FOR_CONNECTED_PEER} seconds); time difference is {diff} seconds." ); // If time diff is in the acceptable gap, end the validation here. @@ -783,11 +783,6 @@ fn start_gossipsub( if swarm.disconnect_peer_id(peer_id).is_err() { error!("Disconnection from `{peer_id}` failed unexpectedly, which should never happen."); } - - let peer_list: Vec<_> = swarm.connected_peers().collect(); - println!("????????????????"); - dbg!(peer_list); - println!("!!!!!!!!!!!!!!!!"); } loop { From b693d8c481a9db85fd154f585743419473e31d6d Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Thu, 31 Oct 2024 15:19:33 +0300 Subject: [PATCH 07/22] rename `NetworkInfoRequest` to `PeerInfoRequest` Signed-off-by: onur-ozkan --- mm2src/mm2_main/src/lp_network.rs | 2 +- mm2src/mm2_main/src/lp_stats.rs | 33 +++++++++---------- .../src/application/request_response/mod.rs | 8 ++--- .../request_response/network_info.rs | 2 +- mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 4 +-- 5 files changed, 21 insertions(+), 28 deletions(-) diff --git a/mm2src/mm2_main/src/lp_network.rs b/mm2src/mm2_main/src/lp_network.rs index 3fbd84f9d4..11dac7e065 100644 --- a/mm2src/mm2_main/src/lp_network.rs +++ b/mm2src/mm2_main/src/lp_network.rs @@ -232,7 +232,7 @@ fn process_p2p_request( let result = match request { P2PRequest::Ordermatch(req) => lp_ordermatch::process_peer_request(ctx.clone(), req), - P2PRequest::NetworkInfo(req) => lp_stats::process_info_request(ctx.clone(), req).map(Some), + P2PRequest::PeerInfo(req) => lp_stats::process_info_request(ctx.clone(), req).map(Some), }; let res = match result { diff --git a/mm2src/mm2_main/src/lp_stats.rs b/mm2src/mm2_main/src/lp_stats.rs index 8bea3ecb8c..7590f99608 100644 --- a/mm2src/mm2_main/src/lp_stats.rs +++ b/mm2src/mm2_main/src/lp_stats.rs @@ -7,7 +7,7 @@ use futures::lock::Mutex as AsyncMutex; use http::StatusCode; use mm2_core::mm_ctx::{from_ctx, MmArc}; use mm2_err_handle::prelude::*; -use mm2_libp2p::application::request_response::network_info::NetworkInfoRequest; +use mm2_libp2p::application::request_response::network_info::PeerInfoRequest; use mm2_libp2p::{encode_message, NetworkInfo, PeerId, RelayAddress, RelayAddressError}; use serde_json::{self as json, Value as Json}; use std::collections::{HashMap, HashSet}; @@ -180,10 +180,10 @@ fn process_get_peer_utc_timestamp_request() -> Result, String> { encode_message(×tamp).map_err(|e| e.to_string()) } -pub fn process_info_request(ctx: MmArc, request: NetworkInfoRequest) -> Result, String> { +pub fn process_info_request(ctx: MmArc, request: PeerInfoRequest) -> Result, String> { match request { - NetworkInfoRequest::GetMm2Version => process_get_version_request(ctx), - NetworkInfoRequest::GetPeerUtcTimestamp => process_get_peer_utc_timestamp_request(), + PeerInfoRequest::GetMm2Version => process_get_version_request(ctx), + PeerInfoRequest::GetPeerUtcTimestamp => process_get_peer_utc_timestamp_request(), } } @@ -302,20 +302,17 @@ async fn stat_collection_loop(ctx: MmArc, interval: f64) { let peers: Vec = peers_names.keys().cloned().collect(); let timestamp = now_sec(); - let get_versions_res = match request_peers::( - ctx.clone(), - P2PRequest::NetworkInfo(NetworkInfoRequest::GetMm2Version), - peers, - ) - .await - { - Ok(res) => res, - Err(e) => { - log::error!("Error getting nodes versions from peers: {}", e); - Timer::sleep(10.).await; - continue; - }, - }; + let get_versions_res = + match request_peers::(ctx.clone(), P2PRequest::PeerInfo(PeerInfoRequest::GetMm2Version), peers) + .await + { + Ok(res) => res, + Err(e) => { + log::error!("Error getting nodes versions from peers: {}", e); + Timer::sleep(10.).await; + continue; + }, + }; for (peer_id, response) in get_versions_res { let name = match peers_names.get(&peer_id.to_string()) { diff --git a/mm2src/mm2_p2p/src/application/request_response/mod.rs b/mm2src/mm2_p2p/src/application/request_response/mod.rs index 28da482bdc..16903857ab 100644 --- a/mm2src/mm2_p2p/src/application/request_response/mod.rs +++ b/mm2src/mm2_p2p/src/application/request_response/mod.rs @@ -12,10 +12,6 @@ use serde::{Deserialize, Serialize}; pub enum P2PRequest { /// Request for order matching. Ordermatch(ordermatch::OrdermatchRequest), - /// Request for network information from the target peer. - /// - /// TODO: This should be called `PeerInfoRequest` instead. However, renaming it - /// will introduce a breaking change in the network and is not worth it. Do this - /// renaming when there is already a breaking change in the release. - NetworkInfo(network_info::NetworkInfoRequest), + /// Request various information from the target peer. + PeerInfo(network_info::PeerInfoRequest), } diff --git a/mm2src/mm2_p2p/src/application/request_response/network_info.rs b/mm2src/mm2_p2p/src/application/request_response/network_info.rs index 4d610d932c..87455091ee 100644 --- a/mm2src/mm2_p2p/src/application/request_response/network_info.rs +++ b/mm2src/mm2_p2p/src/application/request_response/network_info.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; /// Wraps the different types of network information requests for the P2P request-response /// protocol. #[derive(Debug, Deserialize, Eq, PartialEq, Serialize)] -pub enum NetworkInfoRequest { +pub enum PeerInfoRequest { /// Get MM2 version of nodes added to stats collection GetMm2Version, /// Get UTC timestamp in seconds from the target peer diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index 9f7f1888bd..36889b3e79 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -30,7 +30,7 @@ use super::peers_exchange::{PeerAddresses, PeersExchange, PeersExchangeRequest, use super::ping::AdexPing; use super::request_response::{build_request_response_behaviour, PeerRequest, PeerResponse, RequestResponseBehaviour, RequestResponseSender}; -use crate::application::request_response::network_info::NetworkInfoRequest; +use crate::application::request_response::network_info::PeerInfoRequest; use crate::application::request_response::P2PRequest; use crate::network::{get_all_network_seednodes, DEFAULT_NETID}; use crate::relay_address::{RelayAddress, RelayAddressError}; @@ -211,7 +211,7 @@ async fn validate_peer_time( mut response_tx: UnboundedSender>, rp_sender: RequestResponseSender, ) { - let request = P2PRequest::NetworkInfo(NetworkInfoRequest::GetPeerUtcTimestamp); + let request = P2PRequest::PeerInfo(PeerInfoRequest::GetPeerUtcTimestamp); let encoded_request = encode_message(&request) .expect("Static type `NetworkInfoRequest::GetPeerUtcTimestamp` should never fail in serialization."); From 92af2858acabcf99bb33d5c1ff789714c2184592 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Mon, 4 Nov 2024 17:09:03 +0300 Subject: [PATCH 08/22] handle recently dialed peers Signed-off-by: onur-ozkan --- Cargo.lock | 25 +++++++++++-- mm2src/common/Cargo.toml | 2 +- mm2src/mm2_p2p/Cargo.toml | 2 ++ mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 41 +++++++++++++++++++++- 4 files changed, 66 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f0cef862ae..6e8a158fbf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4248,6 +4248,7 @@ dependencies = [ "sha2 0.10.7", "smallvec 1.6.1", "syn 2.0.38", + "timed-map", "tokio", "void", ] @@ -5711,9 +5712,9 @@ checksum = "4c691c0e608126e00913e33f0ccf3727d5fc84573623b8d65b2df340b5201783" [[package]] name = "rustc-hash" -version = "1.1.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152" [[package]] name = "rustc-hex" @@ -6944,6 +6945,16 @@ dependencies = [ "time-core", ] +[[package]] +name = "timed-map" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d4e2e0c83d81123e086105a1f6389fdeefa0ba87923e42b4e747d92a5666acc" +dependencies = [ + "rustc-hash", + "web-time", +] + [[package]] name = "tiny-keccak" version = "1.4.4" @@ -7709,6 +7720,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "web3" version = "0.19.0" diff --git a/mm2src/common/Cargo.toml b/mm2src/common/Cargo.toml index 6a5395b360..6921fdb3ed 100644 --- a/mm2src/common/Cargo.toml +++ b/mm2src/common/Cargo.toml @@ -36,7 +36,7 @@ parking_lot = { version = "0.12.0", features = ["nightly"] } parking_lot_core = { version = "0.6", features = ["nightly"] } primitive-types = "0.11.1" rand = { version = "0.7", features = ["std", "small_rng"] } -rustc-hash = "1.1.0" +rustc-hash = "2.0" regex = "1" serde = "1" serde_derive = "1" diff --git a/mm2src/mm2_p2p/Cargo.toml b/mm2src/mm2_p2p/Cargo.toml index 6b7f43e7f4..a1c379976c 100644 --- a/mm2src/mm2_p2p/Cargo.toml +++ b/mm2src/mm2_p2p/Cargo.toml @@ -39,12 +39,14 @@ void = "1.0" futures-rustls = "0.24" instant = "0.1.12" libp2p = { git = "https://github.com/KomodoPlatform/rust-libp2p.git", tag = "k-0.52.4", default-features = false, features = ["dns", "identify", "floodsub", "gossipsub", "noise", "ping", "request-response", "secp256k1", "tcp", "tokio", "websocket", "macros", "yamux"] } +timed-map = { version = "1.1.0", features = ["rustc-hash"] } tokio = { version = "1.20", default-features = false } [target.'cfg(target_arch = "wasm32")'.dependencies] futures-rustls = "0.22" instant = { version = "0.1.12", features = ["wasm-bindgen"] } libp2p = { git = "https://github.com/KomodoPlatform/rust-libp2p.git", tag = "k-0.52.4", default-features = false, features = ["identify", "floodsub", "noise", "gossipsub", "ping", "request-response", "secp256k1", "wasm-ext", "wasm-ext-websocket", "macros", "yamux"] } +timed-map = { version = "1.1.0", features = ["rustc-hash", "wasm"] } [dev-dependencies] async-std = "1.6.2" diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index 36889b3e79..ae268f1333 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -8,6 +8,7 @@ use futures::{channel::oneshot, use futures_rustls::rustls; use futures_ticker::Ticker; use instant::Duration; +use lazy_static::lazy_static; use libp2p::core::transport::Boxed as BoxedTransport; use libp2p::core::{ConnectedPoint, Endpoint}; use libp2p::floodsub::{Floodsub, FloodsubEvent, Topic as FloodsubTopic}; @@ -24,7 +25,9 @@ use std::collections::HashMap; use std::hash::{Hash, Hasher}; use std::iter; use std::net::IpAddr; +use std::sync::Mutex; use std::task::{Context, Poll}; +use timed_map::{MapKind, StdClock, TimedMap}; use super::peers_exchange::{PeerAddresses, PeersExchange, PeersExchangeRequest, PeersExchangeResponse}; use super::ping::AdexPing; @@ -35,7 +38,7 @@ use crate::application::request_response::P2PRequest; use crate::network::{get_all_network_seednodes, DEFAULT_NETID}; use crate::relay_address::{RelayAddress, RelayAddressError}; use crate::swarm_runtime::SwarmRuntime; -use crate::{decode_message, encode_message, NetworkInfo, NetworkPorts, RequestResponseBehaviourEvent}; +use crate::{decode_message, encode_message, sha256, NetworkInfo, NetworkPorts, RequestResponseBehaviourEvent}; pub use libp2p::gossipsub::{Behaviour as Gossipsub, IdentTopic, MessageAuthenticity, MessageId, Topic, TopicHash}; pub use libp2p::gossipsub::{ConfigBuilder as GossipsubConfigBuilder, Event as GossipsubEvent, @@ -57,6 +60,16 @@ const CHANNEL_BUF_SIZE: usize = 1024 * 8; /// `ConnectionEstablished` event. const MAX_TIME_GAP_FOR_CONNECTED_PEER: u64 = 30; +/// Used for storing peers in [`RECENTLY_DIALED_PEERS`]. +const DIAL_RETRY_DELAY: Duration = Duration::from_secs(60 * 5); + +type CopyableMultiaddr = [u8; 32]; + +lazy_static! { + /// Tracks recently dialed peers to avoid repeated connection attempts. + pub static ref RECENTLY_DIALED_PEERS: Mutex> = Mutex::new(TimedMap::new_with_map_kind(MapKind::FxHashMap)); +} + pub const DEPRECATED_NETID_LIST: &[u16] = &[ 7777, // TODO: keep it inaccessible until Q2 of 2024. ]; @@ -756,12 +769,21 @@ fn start_gossipsub( _ => (), } + let mut recently_dialed_peers = RECENTLY_DIALED_PEERS.lock().unwrap(); for relay in bootstrap.choose_multiple(&mut rng, mesh_n) { + if recently_dialed_peers + .insert_expirable(sha256(relay), (), DIAL_RETRY_DELAY) + .is_some() + { + continue; + } + match libp2p::Swarm::dial(&mut swarm, relay.clone()) { Ok(_) => info!("Dialed {}", relay), Err(e) => error!("Dial {:?} failed: {:?}", relay, e), } } + drop(recently_dialed_peers); let mut check_connected_relays_interval = Ticker::new_with_next(CONNECTED_RELAYS_CHECK_INTERVAL, CONNECTED_RELAYS_CHECK_INTERVAL); @@ -867,6 +889,7 @@ fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses .peers_exchange .get_random_peers(to_connect_num, |peer| !connected_relays.contains(peer)); + let mut recently_dialed_peers = RECENTLY_DIALED_PEERS.lock().unwrap(); // choose some random bootstrap addresses to connect if peers exchange returned not enough peers if to_connect.len() < to_connect_num { let connect_bootstrap_num = to_connect_num - to_connect.len(); @@ -876,6 +899,13 @@ fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses .collect::>() .choose_multiple(&mut rng, connect_bootstrap_num) { + if recently_dialed_peers + .insert_expirable(sha256(addr), (), DIAL_RETRY_DELAY) + .is_some() + { + continue; + } + if let Err(e) = libp2p::Swarm::dial(swarm, (*addr).clone()) { error!("Bootstrap addr {} dial error {}", addr, e); } @@ -886,11 +916,20 @@ fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses if swarm.behaviour().core.gossipsub.is_connected_to_addr(&addr) { continue; } + + if recently_dialed_peers + .insert_expirable(sha256(&addr), (), DIAL_RETRY_DELAY) + .is_some() + { + continue; + } + if let Err(e) = libp2p::Swarm::dial(swarm, addr.clone()) { error!("Peer {} address {} dial error {}", peer, addr, e); } } } + drop(recently_dialed_peers); } if connected_relays.len() > max_n { From 86eb068a7f454d19d5b0cb0a50a23f49fc98fa70 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Mon, 4 Nov 2024 17:16:16 +0300 Subject: [PATCH 09/22] add useful logs Signed-off-by: onur-ozkan --- mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index ae268f1333..275256f124 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -775,6 +775,7 @@ fn start_gossipsub( .insert_expirable(sha256(relay), (), DIAL_RETRY_DELAY) .is_some() { + info!("Connection attempt was already made recently to '{relay}'. Skipping this dial attempt."); continue; } @@ -903,6 +904,7 @@ fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses .insert_expirable(sha256(addr), (), DIAL_RETRY_DELAY) .is_some() { + info!("Connection attempt was already made recently to '{addr}'. Skipping this dial attempt."); continue; } @@ -921,6 +923,7 @@ fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses .insert_expirable(sha256(&addr), (), DIAL_RETRY_DELAY) .is_some() { + info!("Connection attempt was already made recently to '{addr}'. Skipping this dial attempt."); continue; } From c1a6eafa5c92a858e667a57ea597bbebd4295d3c Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Mon, 4 Nov 2024 17:27:16 +0300 Subject: [PATCH 10/22] create function for pre-dial check Signed-off-by: onur-ozkan --- mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 36 ++++++++++++---------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index 275256f124..d1783e04bf 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -25,7 +25,7 @@ use std::collections::HashMap; use std::hash::{Hash, Hasher}; use std::iter; use std::net::IpAddr; -use std::sync::Mutex; +use std::sync::{Mutex, MutexGuard}; use std::task::{Context, Poll}; use timed_map::{MapKind, StdClock, TimedMap}; @@ -182,6 +182,22 @@ pub enum AdexBehaviourCmd { }, } +/// Determines if a dial attempt to the remote should be made. +/// +/// Returns `false` if a dial attempt to the given address has already been made, +/// in which case the caller must skip the dial attempt. +fn pre_dial_check(recently_dialed_peers: &mut MutexGuard>, addr: &Multiaddr) -> bool { + if recently_dialed_peers + .insert_expirable(sha256(addr), (), DIAL_RETRY_DELAY) + .is_some() + { + info!("Connection attempt was already made recently to '{addr}'."); + return false; + } + + true +} + /// Returns info about directly connected peers. pub async fn get_directly_connected_peers(mut cmd_tx: AdexCmdTx) -> HashMap> { let (result_tx, rx) = oneshot::channel(); @@ -771,11 +787,7 @@ fn start_gossipsub( let mut recently_dialed_peers = RECENTLY_DIALED_PEERS.lock().unwrap(); for relay in bootstrap.choose_multiple(&mut rng, mesh_n) { - if recently_dialed_peers - .insert_expirable(sha256(relay), (), DIAL_RETRY_DELAY) - .is_some() - { - info!("Connection attempt was already made recently to '{relay}'. Skipping this dial attempt."); + if !pre_dial_check(&mut recently_dialed_peers, relay) { continue; } @@ -900,11 +912,7 @@ fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses .collect::>() .choose_multiple(&mut rng, connect_bootstrap_num) { - if recently_dialed_peers - .insert_expirable(sha256(addr), (), DIAL_RETRY_DELAY) - .is_some() - { - info!("Connection attempt was already made recently to '{addr}'. Skipping this dial attempt."); + if !pre_dial_check(&mut recently_dialed_peers, addr) { continue; } @@ -919,11 +927,7 @@ fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses continue; } - if recently_dialed_peers - .insert_expirable(sha256(&addr), (), DIAL_RETRY_DELAY) - .is_some() - { - info!("Connection attempt was already made recently to '{addr}'. Skipping this dial attempt."); + if !pre_dial_check(&mut recently_dialed_peers, &addr) { continue; } From e5c284d53dd72281adc9cd20406c3a13c00b548f Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Mon, 4 Nov 2024 17:41:58 +0300 Subject: [PATCH 11/22] set max cap for timestamp channel Signed-off-by: onur-ozkan --- mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index d1783e04bf..d6d3aea99f 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -1,6 +1,5 @@ use common::executor::SpawnFuture; use derive_more::Display; -use futures::channel::mpsc::UnboundedSender; use futures::channel::mpsc::{channel, Receiver, Sender}; use futures::{channel::oneshot, future::{join_all, poll_fn}, @@ -237,7 +236,7 @@ pub async fn get_relay_mesh(mut cmd_tx: AdexCmdTx) -> Vec { async fn validate_peer_time( peer: PeerId, - mut response_tx: UnboundedSender>, + mut response_tx: Sender>, rp_sender: RequestResponseSender, ) { let request = P2PRequest::PeerInfo(PeerInfoRequest::GetPeerUtcTimestamp); @@ -804,7 +803,7 @@ fn start_gossipsub( let mut announce_interval = Ticker::new_with_next(ANNOUNCE_INTERVAL, ANNOUNCE_INITIAL_DELAY); let mut listening = false; - let (timestamp_tx, mut timestamp_rx) = futures::channel::mpsc::unbounded(); + let (timestamp_tx, mut timestamp_rx) = futures::channel::mpsc::channel(mesh_n_high); let polling_fut = poll_fn(move |cx: &mut Context| { loop { match swarm.behaviour_mut().cmd_rx.poll_next_unpin(cx) { From 130af670a7826baba49da778de26797c66b420e2 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Mon, 4 Nov 2024 17:46:57 +0300 Subject: [PATCH 12/22] remove leftover Signed-off-by: onur-ozkan --- mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index d6d3aea99f..619f46b054 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -66,7 +66,7 @@ type CopyableMultiaddr = [u8; 32]; lazy_static! { /// Tracks recently dialed peers to avoid repeated connection attempts. - pub static ref RECENTLY_DIALED_PEERS: Mutex> = Mutex::new(TimedMap::new_with_map_kind(MapKind::FxHashMap)); + static ref RECENTLY_DIALED_PEERS: Mutex> = Mutex::new(TimedMap::new_with_map_kind(MapKind::FxHashMap)); } pub const DEPRECATED_NETID_LIST: &[u16] = &[ From 422626cd01ebfdc5c0fa924c96cf8597186850cf Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Mon, 4 Nov 2024 20:20:21 +0300 Subject: [PATCH 13/22] use `Multiaddr` as key Signed-off-by: onur-ozkan --- Cargo.lock | 4 ++-- mm2src/mm2_p2p/Cargo.toml | 4 ++-- mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 16 +++++----------- 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6e8a158fbf..62d10cbf69 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6947,9 +6947,9 @@ dependencies = [ [[package]] name = "timed-map" -version = "1.1.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d4e2e0c83d81123e086105a1f6389fdeefa0ba87923e42b4e747d92a5666acc" +checksum = "3b102d4d896895d697f1dff4141dff28307532dac57a376b2b5665a55b280dc6" dependencies = [ "rustc-hash", "web-time", diff --git a/mm2src/mm2_p2p/Cargo.toml b/mm2src/mm2_p2p/Cargo.toml index a1c379976c..0c4bcedecf 100644 --- a/mm2src/mm2_p2p/Cargo.toml +++ b/mm2src/mm2_p2p/Cargo.toml @@ -39,14 +39,14 @@ void = "1.0" futures-rustls = "0.24" instant = "0.1.12" libp2p = { git = "https://github.com/KomodoPlatform/rust-libp2p.git", tag = "k-0.52.4", default-features = false, features = ["dns", "identify", "floodsub", "gossipsub", "noise", "ping", "request-response", "secp256k1", "tcp", "tokio", "websocket", "macros", "yamux"] } -timed-map = { version = "1.1.0", features = ["rustc-hash"] } +timed-map = { version = "1.1.1", features = ["rustc-hash"] } tokio = { version = "1.20", default-features = false } [target.'cfg(target_arch = "wasm32")'.dependencies] futures-rustls = "0.22" instant = { version = "0.1.12", features = ["wasm-bindgen"] } libp2p = { git = "https://github.com/KomodoPlatform/rust-libp2p.git", tag = "k-0.52.4", default-features = false, features = ["identify", "floodsub", "noise", "gossipsub", "ping", "request-response", "secp256k1", "wasm-ext", "wasm-ext-websocket", "macros", "yamux"] } -timed-map = { version = "1.1.0", features = ["rustc-hash", "wasm"] } +timed-map = { version = "1.1.1", features = ["rustc-hash", "wasm"] } [dev-dependencies] async-std = "1.6.2" diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index 619f46b054..48046c7c3b 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -37,7 +37,7 @@ use crate::application::request_response::P2PRequest; use crate::network::{get_all_network_seednodes, DEFAULT_NETID}; use crate::relay_address::{RelayAddress, RelayAddressError}; use crate::swarm_runtime::SwarmRuntime; -use crate::{decode_message, encode_message, sha256, NetworkInfo, NetworkPorts, RequestResponseBehaviourEvent}; +use crate::{decode_message, encode_message, NetworkInfo, NetworkPorts, RequestResponseBehaviourEvent}; pub use libp2p::gossipsub::{Behaviour as Gossipsub, IdentTopic, MessageAuthenticity, MessageId, Topic, TopicHash}; pub use libp2p::gossipsub::{ConfigBuilder as GossipsubConfigBuilder, Event as GossipsubEvent, @@ -62,11 +62,9 @@ const MAX_TIME_GAP_FOR_CONNECTED_PEER: u64 = 30; /// Used for storing peers in [`RECENTLY_DIALED_PEERS`]. const DIAL_RETRY_DELAY: Duration = Duration::from_secs(60 * 5); -type CopyableMultiaddr = [u8; 32]; - lazy_static! { /// Tracks recently dialed peers to avoid repeated connection attempts. - static ref RECENTLY_DIALED_PEERS: Mutex> = Mutex::new(TimedMap::new_with_map_kind(MapKind::FxHashMap)); + static ref RECENTLY_DIALED_PEERS: Mutex> = Mutex::new(TimedMap::new_with_map_kind(MapKind::FxHashMap)); } pub const DEPRECATED_NETID_LIST: &[u16] = &[ @@ -185,9 +183,9 @@ pub enum AdexBehaviourCmd { /// /// Returns `false` if a dial attempt to the given address has already been made, /// in which case the caller must skip the dial attempt. -fn pre_dial_check(recently_dialed_peers: &mut MutexGuard>, addr: &Multiaddr) -> bool { +fn pre_dial_check(recently_dialed_peers: &mut MutexGuard>, addr: &Multiaddr) -> bool { if recently_dialed_peers - .insert_expirable(sha256(addr), (), DIAL_RETRY_DELAY) + .insert_expirable(addr.clone(), (), DIAL_RETRY_DELAY) .is_some() { info!("Connection attempt was already made recently to '{addr}'."); @@ -234,11 +232,7 @@ pub async fn get_relay_mesh(mut cmd_tx: AdexCmdTx) -> Vec { rx.await.expect("Tx should be present") } -async fn validate_peer_time( - peer: PeerId, - mut response_tx: Sender>, - rp_sender: RequestResponseSender, -) { +async fn validate_peer_time(peer: PeerId, mut response_tx: Sender>, rp_sender: RequestResponseSender) { let request = P2PRequest::PeerInfo(PeerInfoRequest::GetPeerUtcTimestamp); let encoded_request = encode_message(&request) .expect("Static type `NetworkInfoRequest::GetPeerUtcTimestamp` should never fail in serialization."); From 1c7229cbc1f4c7ff260c74c352b9f78d0789f358 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Mon, 4 Nov 2024 21:36:54 +0300 Subject: [PATCH 14/22] fix p2p tests Signed-off-by: onur-ozkan --- mm2src/mm2_p2p/src/behaviours/mod.rs | 34 ++++++++++------------------ 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/mm2src/mm2_p2p/src/behaviours/mod.rs b/mm2src/mm2_p2p/src/behaviours/mod.rs index cdfda38c8d..36436efe4e 100644 --- a/mm2src/mm2_p2p/src/behaviours/mod.rs +++ b/mm2src/mm2_p2p/src/behaviours/mod.rs @@ -106,17 +106,16 @@ mod tests { let node1_port = next_port(); let node1 = Node::spawn(node1_port, vec![], move |mut cmd_tx, event| { - let (request, response_channel) = match event { + let response_channel = match event { AdexBehaviourEvent::RequestResponse(RequestResponseBehaviourEvent::InboundRequest { request, response_channel, .. - }) => (request.req, AdexResponseChannel(response_channel)), + }) if request.req == b"test request" => AdexResponseChannel(response_channel), _ => return, }; request_received_cpy.store(true, Ordering::Relaxed); - assert_eq!(request, b"test request"); let res = AdexResponse::Ok { response: b"test response".to_vec(), @@ -157,19 +156,17 @@ mod tests { impl RequestHandler { fn handle(&mut self, mut cmd_tx: mpsc::Sender, event: AdexBehaviourEvent) { - let (request, response_channel) = match event { + let response_channel = match event { AdexBehaviourEvent::RequestResponse(RequestResponseBehaviourEvent::InboundRequest { request, response_channel, .. - }) => (request.req, AdexResponseChannel(response_channel)), + }) if request.req == b"test request" => AdexResponseChannel(response_channel), _ => return, }; self.requests += 1; - assert_eq!(request, b"test request"); - // the first time we should respond the none if self.requests == 1 { let res = AdexResponse::None; @@ -249,17 +246,16 @@ mod tests { let node1_port = next_port(); let _node1 = Node::spawn(node1_port, vec![], move |mut cmd_tx, event| { - let (request, response_channel) = match event { + let response_channel = match event { AdexBehaviourEvent::RequestResponse(RequestResponseBehaviourEvent::InboundRequest { request, response_channel, .. - }) => (request.req, AdexResponseChannel(response_channel)), + }) if request.req == b"test request" => AdexResponseChannel(response_channel), _ => return, }; request_received_cpy.store(true, Ordering::Relaxed); - assert_eq!(request, b"test request"); let res = AdexResponse::None; cmd_tx @@ -293,17 +289,15 @@ mod tests { let receiver1_port = next_port(); let receiver1 = Node::spawn(receiver1_port, vec![], move |mut cmd_tx, event| { - let (request, response_channel) = match event { + let response_channel = match event { AdexBehaviourEvent::RequestResponse(RequestResponseBehaviourEvent::InboundRequest { request, response_channel, .. - }) => (request.req, AdexResponseChannel(response_channel)), + }) if request.req == b"test request" => AdexResponseChannel(response_channel), _ => return, }; - assert_eq!(request, b"test request"); - let res = AdexResponse::None; cmd_tx .try_send(AdexBehaviourCmd::SendResponse { res, response_channel }) @@ -313,17 +307,15 @@ mod tests { let receiver2_port = next_port(); let receiver2 = Node::spawn(receiver2_port, vec![], move |mut cmd_tx, event| { - let (request, response_channel) = match event { + let response_channel = match event { AdexBehaviourEvent::RequestResponse(RequestResponseBehaviourEvent::InboundRequest { request, response_channel, .. - }) => (request.req, AdexResponseChannel(response_channel)), + }) if request.req == b"test request" => AdexResponseChannel(response_channel), _ => return, }; - assert_eq!(request, b"test request"); - let res = AdexResponse::Err { error: "test error".into(), }; @@ -335,17 +327,15 @@ mod tests { let receiver3_port = next_port(); let receiver3 = Node::spawn(receiver3_port, vec![], move |mut cmd_tx, event| { - let (request, response_channel) = match event { + let response_channel = match event { AdexBehaviourEvent::RequestResponse(RequestResponseBehaviourEvent::InboundRequest { request, response_channel, .. - }) => (request.req, AdexResponseChannel(response_channel)), + }) if request.req == b"test request" => AdexResponseChannel(response_channel), _ => return, }; - assert_eq!(request, b"test request"); - let res = AdexResponse::Ok { response: b"test response".to_vec(), }; From 0691aba681c698fd6068566656a61f09e9bf3dcb Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Tue, 5 Nov 2024 08:37:04 +0300 Subject: [PATCH 15/22] update logs Signed-off-by: onur-ozkan --- mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index 48046c7c3b..ff6bcc4f06 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -246,12 +246,11 @@ async fn validate_peer_time(peer: PeerId, mut response_tx: Sender let diff = now.abs_diff(timestamp); - info!( - "Peer '{peer}' is within the acceptable time gap ({MAX_TIME_GAP_FOR_CONNECTED_PEER} seconds); time difference is {diff} seconds." - ); - // If time diff is in the acceptable gap, end the validation here. if diff <= MAX_TIME_GAP_FOR_CONNECTED_PEER { + info!( + "Peer '{peer}' is within the acceptable time gap ({MAX_TIME_GAP_FOR_CONNECTED_PEER} seconds); time difference is {diff} seconds." + ); response_tx.send(None).await.unwrap(); return; } @@ -262,7 +261,7 @@ async fn validate_peer_time(peer: PeerId, mut response_tx: Sender // If the function reaches this point, this means validation has failed. // Send the peer ID to disconnect from it. - eprintln!("Peer `{peer}` is out of sync in time; disconnecting."); + eprintln!("Failed to validate the time for peer `{peer}`; disconnecting."); response_tx.send(Some(peer)).await.unwrap(); } From 36926736dd8920c4dcbe8f39c23af23651b8b8a6 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Wed, 13 Nov 2024 08:49:00 +0300 Subject: [PATCH 16/22] rename leftovers Signed-off-by: onur-ozkan --- mm2src/mm2_main/src/lp_stats.rs | 2 +- mm2src/mm2_p2p/src/application/request_response/mod.rs | 4 ++-- .../request_response/{network_info.rs => peer_info.rs} | 0 mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) rename mm2src/mm2_p2p/src/application/request_response/{network_info.rs => peer_info.rs} (100%) diff --git a/mm2src/mm2_main/src/lp_stats.rs b/mm2src/mm2_main/src/lp_stats.rs index 7590f99608..7f70fc286f 100644 --- a/mm2src/mm2_main/src/lp_stats.rs +++ b/mm2src/mm2_main/src/lp_stats.rs @@ -7,7 +7,7 @@ use futures::lock::Mutex as AsyncMutex; use http::StatusCode; use mm2_core::mm_ctx::{from_ctx, MmArc}; use mm2_err_handle::prelude::*; -use mm2_libp2p::application::request_response::network_info::PeerInfoRequest; +use mm2_libp2p::application::request_response::peer_info::PeerInfoRequest; use mm2_libp2p::{encode_message, NetworkInfo, PeerId, RelayAddress, RelayAddressError}; use serde_json::{self as json, Value as Json}; use std::collections::{HashMap, HashSet}; diff --git a/mm2src/mm2_p2p/src/application/request_response/mod.rs b/mm2src/mm2_p2p/src/application/request_response/mod.rs index 16903857ab..0a4763e5d4 100644 --- a/mm2src/mm2_p2p/src/application/request_response/mod.rs +++ b/mm2src/mm2_p2p/src/application/request_response/mod.rs @@ -2,8 +2,8 @@ //! which are separate from other request types such as RPC requests or Gossipsub //! messages. -pub mod network_info; pub mod ordermatch; +pub mod peer_info; use serde::{Deserialize, Serialize}; @@ -13,5 +13,5 @@ pub enum P2PRequest { /// Request for order matching. Ordermatch(ordermatch::OrdermatchRequest), /// Request various information from the target peer. - PeerInfo(network_info::PeerInfoRequest), + PeerInfo(peer_info::PeerInfoRequest), } diff --git a/mm2src/mm2_p2p/src/application/request_response/network_info.rs b/mm2src/mm2_p2p/src/application/request_response/peer_info.rs similarity index 100% rename from mm2src/mm2_p2p/src/application/request_response/network_info.rs rename to mm2src/mm2_p2p/src/application/request_response/peer_info.rs diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index ff6bcc4f06..94b808fe48 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -32,7 +32,7 @@ use super::peers_exchange::{PeerAddresses, PeersExchange, PeersExchangeRequest, use super::ping::AdexPing; use super::request_response::{build_request_response_behaviour, PeerRequest, PeerResponse, RequestResponseBehaviour, RequestResponseSender}; -use crate::application::request_response::network_info::PeerInfoRequest; +use crate::application::request_response::peer_info::PeerInfoRequest; use crate::application::request_response::P2PRequest; use crate::network::{get_all_network_seednodes, DEFAULT_NETID}; use crate::relay_address::{RelayAddress, RelayAddressError}; @@ -235,7 +235,7 @@ pub async fn get_relay_mesh(mut cmd_tx: AdexCmdTx) -> Vec { async fn validate_peer_time(peer: PeerId, mut response_tx: Sender>, rp_sender: RequestResponseSender) { let request = P2PRequest::PeerInfo(PeerInfoRequest::GetPeerUtcTimestamp); let encoded_request = encode_message(&request) - .expect("Static type `NetworkInfoRequest::GetPeerUtcTimestamp` should never fail in serialization."); + .expect("Static type `PeerInfoRequest::GetPeerUtcTimestamp` should never fail in serialization."); match request_one_peer(peer, encoded_request, rp_sender).await { PeerResponse::Ok { res } => { From 8487ab11f9b5c9c623ce448097c6402226fc18c7 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Wed, 13 Nov 2024 08:58:49 +0300 Subject: [PATCH 17/22] update timing values Signed-off-by: onur-ozkan --- mm2src/mm2_main/src/lp_swap.rs | 3 ++- mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/mm2src/mm2_main/src/lp_swap.rs b/mm2src/mm2_main/src/lp_swap.rs index 7692503c18..d71cfa16c0 100644 --- a/mm2src/mm2_main/src/lp_swap.rs +++ b/mm2src/mm2_main/src/lp_swap.rs @@ -74,6 +74,7 @@ use derive_more::Display; use http::Response; use mm2_core::mm_ctx::{from_ctx, MmArc}; use mm2_err_handle::prelude::*; +use mm2_libp2p::behaviours::atomicdex::MAX_TIME_GAP_FOR_CONNECTED_PEER; use mm2_libp2p::{decode_signed, encode_and_sign, pub_sub_topic, PeerId, TopicPrefix}; use mm2_number::{BigDecimal, BigRational, MmNumber, MmNumberMultiRepr}; use mm2_state_machine::storable_state_machine::StateMachineStorage; @@ -151,7 +152,7 @@ pub const TX_HELPER_PREFIX: TopicPrefix = "txhlp"; pub(crate) const LEGACY_SWAP_TYPE: u8 = 0; pub(crate) const MAKER_SWAP_V2_TYPE: u8 = 1; pub(crate) const TAKER_SWAP_V2_TYPE: u8 = 2; -const MAX_STARTED_AT_DIFF: u64 = 60; +const MAX_STARTED_AT_DIFF: u64 = MAX_TIME_GAP_FOR_CONNECTED_PEER * 3; const NEGOTIATE_SEND_INTERVAL: f64 = 30.; diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index 94b808fe48..b5b66af3d3 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -57,7 +57,7 @@ const CHANNEL_BUF_SIZE: usize = 1024 * 8; /// Used in time validation logic for each peer which runs immediately after the /// `ConnectionEstablished` event. -const MAX_TIME_GAP_FOR_CONNECTED_PEER: u64 = 30; +pub const MAX_TIME_GAP_FOR_CONNECTED_PEER: u64 = 20; /// Used for storing peers in [`RECENTLY_DIALED_PEERS`]. const DIAL_RETRY_DELAY: Duration = Duration::from_secs(60 * 5); From faeee43067a0243ce1ea97935e46d4a908b8b8c1 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Wed, 13 Nov 2024 23:27:14 +0300 Subject: [PATCH 18/22] minor fixes Signed-off-by: onur-ozkan --- mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index b5b66af3d3..72b92698bb 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -57,6 +57,9 @@ const CHANNEL_BUF_SIZE: usize = 1024 * 8; /// Used in time validation logic for each peer which runs immediately after the /// `ConnectionEstablished` event. +/// +/// Be careful when updating this value, we have some defaults (like for swaps) +/// depending on this. pub const MAX_TIME_GAP_FOR_CONNECTED_PEER: u64 = 20; /// Used for storing peers in [`RECENTLY_DIALED_PEERS`]. @@ -248,7 +251,7 @@ async fn validate_peer_time(peer: PeerId, mut response_tx: Sender // If time diff is in the acceptable gap, end the validation here. if diff <= MAX_TIME_GAP_FOR_CONNECTED_PEER { - info!( + debug!( "Peer '{peer}' is within the acceptable time gap ({MAX_TIME_GAP_FOR_CONNECTED_PEER} seconds); time difference is {diff} seconds." ); response_tx.send(None).await.unwrap(); @@ -261,7 +264,7 @@ async fn validate_peer_time(peer: PeerId, mut response_tx: Sender // If the function reaches this point, this means validation has failed. // Send the peer ID to disconnect from it. - eprintln!("Failed to validate the time for peer `{peer}`; disconnecting."); + error!("Failed to validate the time for peer `{peer}`; disconnecting."); response_tx.send(Some(peer)).await.unwrap(); } From 1f2950ef14ac273b2ae5aa3662dacbd9d6a7cf9f Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Thu, 14 Nov 2024 08:36:42 +0300 Subject: [PATCH 19/22] update pre dial check calls Signed-off-by: onur-ozkan --- mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 27 +++++++++++-------- .../mm2_p2p/src/behaviours/peers_exchange.rs | 11 ++++++-- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index 72b92698bb..d1ef3cb244 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -890,27 +890,32 @@ fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses let mut rng = rand::thread_rng(); if connected_relays.len() < mesh_n_low { + let mut recently_dialed_peers = RECENTLY_DIALED_PEERS.lock().unwrap(); let to_connect_num = mesh_n - connected_relays.len(); - let to_connect = swarm - .behaviour_mut() - .core - .peers_exchange - .get_random_peers(to_connect_num, |peer| !connected_relays.contains(peer)); + let to_connect = + swarm + .behaviour_mut() + .core + .peers_exchange + .get_random_peers(to_connect_num, |peer, addresses| { + addresses + .iter() + .any(|addr| pre_dial_check(&mut recently_dialed_peers, addr)) + && !connected_relays.contains(peer) + }); - let mut recently_dialed_peers = RECENTLY_DIALED_PEERS.lock().unwrap(); // choose some random bootstrap addresses to connect if peers exchange returned not enough peers if to_connect.len() < to_connect_num { let connect_bootstrap_num = to_connect_num - to_connect.len(); for addr in bootstrap_addresses .iter() - .filter(|addr| !swarm.behaviour().core.gossipsub.is_connected_to_addr(addr)) + .filter(|addr| { + !swarm.behaviour().core.gossipsub.is_connected_to_addr(addr) + && pre_dial_check(&mut recently_dialed_peers, addr) + }) .collect::>() .choose_multiple(&mut rng, connect_bootstrap_num) { - if !pre_dial_check(&mut recently_dialed_peers, addr) { - continue; - } - if let Err(e) = libp2p::Swarm::dial(swarm, (*addr).clone()) { error!("Bootstrap addr {} dial error {}", addr, e); } diff --git a/mm2src/mm2_p2p/src/behaviours/peers_exchange.rs b/mm2src/mm2_p2p/src/behaviours/peers_exchange.rs index 412fa16355..1bede91995 100644 --- a/mm2src/mm2_p2p/src/behaviours/peers_exchange.rs +++ b/mm2src/mm2_p2p/src/behaviours/peers_exchange.rs @@ -330,11 +330,18 @@ impl PeersExchange { pub fn get_random_peers( &mut self, num: usize, - mut filter: impl FnMut(&PeerId) -> bool, + mut filter: impl FnMut(&PeerId, HashSet) -> bool, ) -> HashMap { let mut result = HashMap::with_capacity(num); let mut rng = rand::thread_rng(); - let peer_ids = self.known_peers.iter().filter(|peer| filter(peer)).collect::>(); + let peer_ids = self + .known_peers + .iter() + .filter(|peer| { + let addresses = self.request_response.addresses_of_peer(peer).into_iter().collect(); + filter(peer, addresses) + }) + .collect::>(); for peer_id in peer_ids.choose_multiple(&mut rng, num) { let addresses = self.request_response.addresses_of_peer(peer_id).into_iter().collect(); From 9ad57a78c9731b46dd8bb64a6c8141377b55c5c5 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Thu, 14 Nov 2024 11:39:20 +0000 Subject: [PATCH 20/22] apply nit fixes Signed-off-by: onur-ozkan --- mm2src/mm2_main/src/lp_stats.rs | 5 +++++ mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 26 +++++++++++----------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/mm2src/mm2_main/src/lp_stats.rs b/mm2src/mm2_main/src/lp_stats.rs index 7f70fc286f..a9b350e8a8 100644 --- a/mm2src/mm2_main/src/lp_stats.rs +++ b/mm2src/mm2_main/src/lp_stats.rs @@ -11,6 +11,7 @@ use mm2_libp2p::application::request_response::peer_info::PeerInfoRequest; use mm2_libp2p::{encode_message, NetworkInfo, PeerId, RelayAddress, RelayAddressError}; use serde_json::{self as json, Value as Json}; use std::collections::{HashMap, HashSet}; +use std::convert::TryInto; use std::sync::Arc; use crate::lp_network::{add_reserved_peer_addresses, lp_network_ports, request_peers, NetIdError, ParseAddressError, @@ -177,6 +178,10 @@ fn process_get_version_request(ctx: MmArc) -> Result, String> { fn process_get_peer_utc_timestamp_request() -> Result, String> { let timestamp = common::get_utc_timestamp(); + let timestamp: u64 = timestamp + .try_into() + .unwrap_or_else(|_| panic!("`common::get_utc_timestamp` returned invalid data: {}", timestamp)); + encode_message(×tamp).map_err(|e| e.to_string()) } diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index d1ef3cb244..f9dedfd5dd 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -186,7 +186,10 @@ pub enum AdexBehaviourCmd { /// /// Returns `false` if a dial attempt to the given address has already been made, /// in which case the caller must skip the dial attempt. -fn pre_dial_check(recently_dialed_peers: &mut MutexGuard>, addr: &Multiaddr) -> bool { +fn check_and_mark_dialed( + recently_dialed_peers: &mut MutexGuard>, + addr: &Multiaddr, +) -> bool { if recently_dialed_peers .insert_expirable(addr.clone(), (), DIAL_RETRY_DELAY) .is_some() @@ -243,9 +246,10 @@ async fn validate_peer_time(peer: PeerId, mut response_tx: Sender match request_one_peer(peer, encoded_request, rp_sender).await { PeerResponse::Ok { res } => { if let Ok(timestamp) = decode_message::(&res) { - let now: u64 = common::get_utc_timestamp() + let now = common::get_utc_timestamp(); + let now: u64 = now .try_into() - .expect("`common::get_utc_timestamp` returned invalid data."); + .unwrap_or_else(|_| panic!("`common::get_utc_timestamp` returned invalid data: {}", now)); let diff = now.abs_diff(timestamp); @@ -782,7 +786,7 @@ fn start_gossipsub( let mut recently_dialed_peers = RECENTLY_DIALED_PEERS.lock().unwrap(); for relay in bootstrap.choose_multiple(&mut rng, mesh_n) { - if !pre_dial_check(&mut recently_dialed_peers, relay) { + if !check_and_mark_dialed(&mut recently_dialed_peers, relay) { continue; } @@ -898,10 +902,10 @@ fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses .core .peers_exchange .get_random_peers(to_connect_num, |peer, addresses| { - addresses - .iter() - .any(|addr| pre_dial_check(&mut recently_dialed_peers, addr)) - && !connected_relays.contains(peer) + !connected_relays.contains(peer) + && addresses + .iter() + .any(|addr| check_and_mark_dialed(&mut recently_dialed_peers, addr)) }); // choose some random bootstrap addresses to connect if peers exchange returned not enough peers @@ -911,7 +915,7 @@ fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses .iter() .filter(|addr| { !swarm.behaviour().core.gossipsub.is_connected_to_addr(addr) - && pre_dial_check(&mut recently_dialed_peers, addr) + && check_and_mark_dialed(&mut recently_dialed_peers, addr) }) .collect::>() .choose_multiple(&mut rng, connect_bootstrap_num) @@ -927,10 +931,6 @@ fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses continue; } - if !pre_dial_check(&mut recently_dialed_peers, &addr) { - continue; - } - if let Err(e) = libp2p::Swarm::dial(swarm, addr.clone()) { error!("Peer {} address {} dial error {}", peer, addr, e); } From 3b18529c600424ee1bccc1deb28c7e8611b9c8c8 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Tue, 10 Dec 2024 17:31:20 +0300 Subject: [PATCH 21/22] don't update existing expiries Signed-off-by: onur-ozkan --- mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index f9dedfd5dd..6efba6afca 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -190,14 +190,13 @@ fn check_and_mark_dialed( recently_dialed_peers: &mut MutexGuard>, addr: &Multiaddr, ) -> bool { - if recently_dialed_peers - .insert_expirable(addr.clone(), (), DIAL_RETRY_DELAY) - .is_some() - { + if recently_dialed_peers.get(addr).is_some() { info!("Connection attempt was already made recently to '{addr}'."); return false; } + recently_dialed_peers.insert_expirable_unchecked(addr.clone(), (), DIAL_RETRY_DELAY); + true } From 7418211845e0d5f30754297c1558fa008dca9b4d Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Tue, 17 Dec 2024 15:19:02 +0300 Subject: [PATCH 22/22] revert breakage Signed-off-by: onur-ozkan --- mm2src/mm2_main/src/lp_network.rs | 2 +- mm2src/mm2_main/src/lp_stats.rs | 33 ++++++++++--------- .../src/application/request_response/mod.rs | 10 ++++-- .../{peer_info.rs => network_info.rs} | 2 +- mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 12 +++++-- 5 files changed, 36 insertions(+), 23 deletions(-) rename mm2src/mm2_p2p/src/application/request_response/{peer_info.rs => network_info.rs} (92%) diff --git a/mm2src/mm2_main/src/lp_network.rs b/mm2src/mm2_main/src/lp_network.rs index 58b235e4eb..6ed8719fa5 100644 --- a/mm2src/mm2_main/src/lp_network.rs +++ b/mm2src/mm2_main/src/lp_network.rs @@ -242,7 +242,7 @@ fn process_p2p_request( let result = match request { P2PRequest::Ordermatch(req) => lp_ordermatch::process_peer_request(ctx.clone(), req), - P2PRequest::PeerInfo(req) => lp_stats::process_info_request(ctx.clone(), req).map(Some), + P2PRequest::NetworkInfo(req) => lp_stats::process_info_request(ctx.clone(), req).map(Some), }; let res = match result { diff --git a/mm2src/mm2_main/src/lp_stats.rs b/mm2src/mm2_main/src/lp_stats.rs index a9b350e8a8..3aedc1cb5c 100644 --- a/mm2src/mm2_main/src/lp_stats.rs +++ b/mm2src/mm2_main/src/lp_stats.rs @@ -7,7 +7,7 @@ use futures::lock::Mutex as AsyncMutex; use http::StatusCode; use mm2_core::mm_ctx::{from_ctx, MmArc}; use mm2_err_handle::prelude::*; -use mm2_libp2p::application::request_response::peer_info::PeerInfoRequest; +use mm2_libp2p::application::request_response::network_info::NetworkInfoRequest; use mm2_libp2p::{encode_message, NetworkInfo, PeerId, RelayAddress, RelayAddressError}; use serde_json::{self as json, Value as Json}; use std::collections::{HashMap, HashSet}; @@ -185,10 +185,10 @@ fn process_get_peer_utc_timestamp_request() -> Result, String> { encode_message(×tamp).map_err(|e| e.to_string()) } -pub fn process_info_request(ctx: MmArc, request: PeerInfoRequest) -> Result, String> { +pub fn process_info_request(ctx: MmArc, request: NetworkInfoRequest) -> Result, String> { match request { - PeerInfoRequest::GetMm2Version => process_get_version_request(ctx), - PeerInfoRequest::GetPeerUtcTimestamp => process_get_peer_utc_timestamp_request(), + NetworkInfoRequest::GetMm2Version => process_get_version_request(ctx), + NetworkInfoRequest::GetPeerUtcTimestamp => process_get_peer_utc_timestamp_request(), } } @@ -307,17 +307,20 @@ async fn stat_collection_loop(ctx: MmArc, interval: f64) { let peers: Vec = peers_names.keys().cloned().collect(); let timestamp = now_sec(); - let get_versions_res = - match request_peers::(ctx.clone(), P2PRequest::PeerInfo(PeerInfoRequest::GetMm2Version), peers) - .await - { - Ok(res) => res, - Err(e) => { - log::error!("Error getting nodes versions from peers: {}", e); - Timer::sleep(10.).await; - continue; - }, - }; + let get_versions_res = match request_peers::( + ctx.clone(), + P2PRequest::NetworkInfo(NetworkInfoRequest::GetMm2Version), + peers, + ) + .await + { + Ok(res) => res, + Err(e) => { + log::error!("Error getting nodes versions from peers: {}", e); + Timer::sleep(10.).await; + continue; + }, + }; for (peer_id, response) in get_versions_res { let name = match peers_names.get(&peer_id.to_string()) { diff --git a/mm2src/mm2_p2p/src/application/request_response/mod.rs b/mm2src/mm2_p2p/src/application/request_response/mod.rs index 0a4763e5d4..28da482bdc 100644 --- a/mm2src/mm2_p2p/src/application/request_response/mod.rs +++ b/mm2src/mm2_p2p/src/application/request_response/mod.rs @@ -2,8 +2,8 @@ //! which are separate from other request types such as RPC requests or Gossipsub //! messages. +pub mod network_info; pub mod ordermatch; -pub mod peer_info; use serde::{Deserialize, Serialize}; @@ -12,6 +12,10 @@ use serde::{Deserialize, Serialize}; pub enum P2PRequest { /// Request for order matching. Ordermatch(ordermatch::OrdermatchRequest), - /// Request various information from the target peer. - PeerInfo(peer_info::PeerInfoRequest), + /// Request for network information from the target peer. + /// + /// TODO: This should be called `PeerInfoRequest` instead. However, renaming it + /// will introduce a breaking change in the network and is not worth it. Do this + /// renaming when there is already a breaking change in the release. + NetworkInfo(network_info::NetworkInfoRequest), } diff --git a/mm2src/mm2_p2p/src/application/request_response/peer_info.rs b/mm2src/mm2_p2p/src/application/request_response/network_info.rs similarity index 92% rename from mm2src/mm2_p2p/src/application/request_response/peer_info.rs rename to mm2src/mm2_p2p/src/application/request_response/network_info.rs index 87455091ee..4d610d932c 100644 --- a/mm2src/mm2_p2p/src/application/request_response/peer_info.rs +++ b/mm2src/mm2_p2p/src/application/request_response/network_info.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; /// Wraps the different types of network information requests for the P2P request-response /// protocol. #[derive(Debug, Deserialize, Eq, PartialEq, Serialize)] -pub enum PeerInfoRequest { +pub enum NetworkInfoRequest { /// Get MM2 version of nodes added to stats collection GetMm2Version, /// Get UTC timestamp in seconds from the target peer diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index 6efba6afca..943ad08126 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -32,7 +32,7 @@ use super::peers_exchange::{PeerAddresses, PeersExchange, PeersExchangeRequest, use super::ping::AdexPing; use super::request_response::{build_request_response_behaviour, PeerRequest, PeerResponse, RequestResponseBehaviour, RequestResponseSender}; -use crate::application::request_response::peer_info::PeerInfoRequest; +use crate::application::request_response::network_info::NetworkInfoRequest; use crate::application::request_response::P2PRequest; use crate::network::{get_all_network_seednodes, DEFAULT_NETID}; use crate::relay_address::{RelayAddress, RelayAddressError}; @@ -238,7 +238,7 @@ pub async fn get_relay_mesh(mut cmd_tx: AdexCmdTx) -> Vec { } async fn validate_peer_time(peer: PeerId, mut response_tx: Sender>, rp_sender: RequestResponseSender) { - let request = P2PRequest::PeerInfo(PeerInfoRequest::GetPeerUtcTimestamp); + let request = P2PRequest::NetworkInfo(NetworkInfoRequest::GetPeerUtcTimestamp); let encoded_request = encode_message(&request) .expect("Static type `PeerInfoRequest::GetPeerUtcTimestamp` should never fail in serialization."); @@ -262,7 +262,13 @@ async fn validate_peer_time(peer: PeerId, mut response_tx: Sender } }; }, - other => error!("Unexpected response `{other:?}` from peer `{peer}`"), + other => { + error!("Unexpected response `{other:?}` from peer `{peer}`"); + // TODO: Ideally, we should send `Some(peer)` to end the connection, + // but we don't want to cause a breaking change yet. + response_tx.send(None).await.unwrap(); + return; + }, } // If the function reaches this point, this means validation has failed.