Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 28 additions & 37 deletions mm2src/lp_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ use common::mm_ctx::{MmArc, MmWeak};
use common::mm_error::prelude::*;
use common::mm_metrics::{ClockOps, MetricsOps};
use derive_more::Display;
use futures::{channel::oneshot, lock::Mutex as AsyncMutex, StreamExt};
use futures::{channel::oneshot, StreamExt};
use mm2_libp2p::atomicdex_behaviour::{AdexBehaviourCmd, AdexBehaviourEvent, AdexCmdTx, AdexEventRx, AdexResponse,
AdexResponseChannel};
use mm2_libp2p::peers_exchange::PeerAddresses;
use mm2_libp2p::{decode_message, encode_message, GossipsubMessage, MessageId, NetworkPorts, PeerId, TOPIC_SEPARATOR};
#[cfg(test)] use mocktopus::macros::*;
use parking_lot::Mutex as PaMutex;
use serde::de;
use std::net::ToSocketAddrs;
use std::sync::Arc;
Expand Down Expand Up @@ -62,14 +63,14 @@ pub enum P2PRequest {

pub struct P2PContext {
/// Using Mutex helps to prevent cloning which can actually result to channel being unbounded in case of using 1 tx clone per 1 message.
pub cmd_tx: AsyncMutex<AdexCmdTx>,
pub cmd_tx: PaMutex<AdexCmdTx>,
}

#[cfg_attr(test, mockable)]
impl P2PContext {
pub fn new(cmd_tx: AdexCmdTx) -> Self {
P2PContext {
cmd_tx: AsyncMutex::new(cmd_tx),
cmd_tx: PaMutex::new(cmd_tx),
}
}

Expand Down Expand Up @@ -103,7 +104,7 @@ pub async fn p2p_event_process_loop(ctx: MmWeak, mut rx: AdexEventRx, i_am_relay
request,
response_channel,
}) => {
if let Err(e) = process_p2p_request(ctx, peer_id, request, response_channel).await {
if let Err(e) = process_p2p_request(ctx, peer_id, request, response_channel) {
log::error!("Error on process P2P request: {:?}", e);
}
},
Expand Down Expand Up @@ -157,16 +158,16 @@ async fn process_p2p_message(
}
}

async fn process_p2p_request(
fn process_p2p_request(
ctx: MmArc,
_peer_id: PeerId,
request: Vec<u8>,
response_channel: AdexResponseChannel,
) -> P2PRequestResult<()> {
let request = decode_message::<P2PRequest>(&request)?;
let result = match request {
P2PRequest::Ordermatch(req) => lp_ordermatch::process_peer_request(ctx.clone(), req).await,
P2PRequest::NetworkInfo(req) => lp_stats::process_info_request(ctx.clone(), req).await,
P2PRequest::Ordermatch(req) => lp_ordermatch::process_peer_request(ctx.clone(), req),
P2PRequest::NetworkInfo(req) => lp_stats::process_info_request(ctx.clone(), req),
};

let res = match result {
Expand All @@ -180,32 +181,29 @@ async fn process_p2p_request(
p2p_ctx
.cmd_tx
.lock()
.await
.try_send(cmd)
.map_to_mm(|e| P2PRequestError::SendError(e.to_string()))?;
Ok(())
}

pub fn broadcast_p2p_msg(ctx: &MmArc, topics: Vec<String>, msg: Vec<u8>) {
let ctx = ctx.clone();
spawn(async move {
let cmd = AdexBehaviourCmd::PublishMsg { topics, msg };
let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx);
if let Err(e) = p2p_ctx.cmd_tx.lock().await.try_send(cmd) {
log::error!("broadcast_p2p_msg cmd_tx.send error {:?}", e);
};
});
let cmd = AdexBehaviourCmd::PublishMsg { topics, msg };
let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx);
if let Err(e) = p2p_ctx.cmd_tx.lock().try_send(cmd) {
log::error!("broadcast_p2p_msg cmd_tx.send error {:?}", e);
};
}

/// Subscribe to the given `topic`.
///
/// # Safety
///
/// The function locks the [`MmCtx::p2p_ctx`] mutex.
pub async fn subscribe_to_topic(ctx: &MmArc, topic: String) {
pub fn subscribe_to_topic(ctx: &MmArc, topic: String) {
let p2p_ctx = P2PContext::fetch_from_mm_arc(ctx);
let cmd = AdexBehaviourCmd::Subscribe { topic };
if let Err(e) = p2p_ctx.cmd_tx.lock().await.try_send(cmd) {
if let Err(e) = p2p_ctx.cmd_tx.lock().try_send(cmd) {
log::error!("subscribe_to_topic cmd_tx.send error {:?}", e);
};
}
Expand All @@ -225,7 +223,6 @@ pub async fn request_any_relay<T: de::DeserializeOwned>(
p2p_ctx
.cmd_tx
.lock()
.await
.try_send(cmd)
.map_to_mm(|e| P2PRequestError::SendError(e.to_string()))?;
match response_rx
Expand Down Expand Up @@ -262,7 +259,6 @@ pub async fn request_relays<T: de::DeserializeOwned>(
p2p_ctx
.cmd_tx
.lock()
.await
.try_send(cmd)
.map_to_mm(|e| P2PRequestError::SendError(e.to_string()))?;
let responses = response_rx
Expand All @@ -288,7 +284,6 @@ pub async fn request_peers<T: de::DeserializeOwned>(
p2p_ctx
.cmd_tx
.lock()
.await
.try_send(cmd)
.map_to_mm(|e| P2PRequestError::SendError(e.to_string()))?;
let responses = response_rx
Expand Down Expand Up @@ -339,27 +334,23 @@ fn parse_peers_responses<T: de::DeserializeOwned>(

pub fn propagate_message(ctx: &MmArc, message_id: MessageId, propagation_source: PeerId) {
let ctx = ctx.clone();
spawn(async move {
let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx);
let cmd = AdexBehaviourCmd::PropagateMessage {
message_id,
propagation_source,
};
if let Err(e) = p2p_ctx.cmd_tx.lock().await.try_send(cmd) {
log::error!("propagate_message cmd_tx.send error {:?}", e);
};
});
let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx);
let cmd = AdexBehaviourCmd::PropagateMessage {
message_id,
propagation_source,
};
if let Err(e) = p2p_ctx.cmd_tx.lock().try_send(cmd) {
log::error!("propagate_message cmd_tx.send error {:?}", e);
};
}

pub fn add_reserved_peer_addresses(ctx: &MmArc, peer: PeerId, addresses: PeerAddresses) {
let ctx = ctx.clone();
spawn(async move {
let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx);
let cmd = AdexBehaviourCmd::AddReservedPeer { peer, addresses };
if let Err(e) = p2p_ctx.cmd_tx.lock().await.try_send(cmd) {
log::error!("add_reserved_peer_addresses cmd_tx.send error {:?}", e);
};
});
let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx);
let cmd = AdexBehaviourCmd::AddReservedPeer { peer, addresses };
if let Err(e) = p2p_ctx.cmd_tx.lock().try_send(cmd) {
log::error!("add_reserved_peer_addresses cmd_tx.send error {:?}", e);
};
}

#[derive(Debug, Display)]
Expand Down
Loading