Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 15 additions & 3 deletions mm2src/mm2_main/src/lp_native_dex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,20 @@ use mm2_core::mm_ctx::{MmArc, MmCtx};
use mm2_err_handle::common_errors::InternalError;
use mm2_err_handle::prelude::*;
use mm2_event_stream::behaviour::{EventBehaviour, EventInitStatus};
use mm2_libp2p::behaviours::atomicdex::DEPRECATED_NETID_LIST;
use mm2_libp2p::behaviours::atomicdex::{GossipsubConfig, DEPRECATED_NETID_LIST};
use mm2_libp2p::{spawn_gossipsub, AdexBehaviourError, NodeType, RelayAddress, RelayAddressError, SeedNodeInfo,
SwarmRuntime, WssCerts};
use mm2_metrics::mm_gauge;
use mm2_net::network_event::NetworkEvent;
use mm2_net::p2p::P2PContext;
use rpc_task::RpcTaskError;
use serde_json::{self as json};
use std::fs;
use std::convert::TryInto;
use std::io;
use std::path::PathBuf;
use std::str;
use std::time::Duration;
use std::{fs, usize};

#[cfg(not(target_arch = "wasm32"))]
use crate::mm2::database::init_and_migrate_sql_db;
Expand Down Expand Up @@ -587,7 +588,18 @@ pub async fn init_p2p(ctx: MmArc) -> P2PResult<()> {
};

let spawner = SwarmRuntime::new(ctx.spawner());
let spawn_result = spawn_gossipsub(netid, force_p2p_key, spawner, seednodes, node_type, move |swarm| {
let max_num_streams: usize = ctx.conf["max_concurrent_connections"]
.as_u64()
.unwrap_or(512)
.try_into()
.unwrap_or(usize::MAX);

let mut gossipsub_config = GossipsubConfig::new(netid, spawner, node_type);
gossipsub_config.to_dial(seednodes);
gossipsub_config.force_key(force_p2p_key);
gossipsub_config.max_num_streams(max_num_streams);

let spawn_result = spawn_gossipsub(gossipsub_config, move |swarm| {
let behaviour = swarm.behaviour();
mm_gauge!(
ctx_on_poll.metrics,
Expand Down
115 changes: 85 additions & 30 deletions mm2src/mm2_p2p/src/behaviours/atomicdex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,33 +591,32 @@ type AtomicDexSwarm = Swarm<AtomicDexBehaviour>;
/// `panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime'`.
#[allow(clippy::too_many_arguments)]
fn start_gossipsub(
netid: u16,
force_key: Option<[u8; 32]>,
runtime: SwarmRuntime,
to_dial: Vec<RelayAddress>,
node_type: NodeType,
config: GossipsubConfig,
on_poll: impl Fn(&AtomicDexSwarm) + Send + 'static,
) -> Result<(Sender<AdexBehaviourCmd>, AdexEventRx, PeerId), AdexBehaviourError> {
let i_am_relay = node_type.is_relay();
let i_am_relay = config.node_type.is_relay();
let mut rng = rand::thread_rng();
let local_key = generate_ed25519_keypair(&mut rng, force_key);
let local_key = generate_ed25519_keypair(&mut rng, config.force_key);
let local_peer_id = PeerId::from(local_key.public());
info!("Local peer id: {:?}", local_peer_id);

let noise_config = noise::Config::new(&local_key).expect("Signing libp2p-noise static DH keypair failed.");

let network_info = node_type.to_network_info();
let network_info = config.node_type.to_network_info();
info!("Network information: {:?}", network_info);

let transport = match network_info {
NetworkInfo::InMemory => build_memory_transport(noise_config),
NetworkInfo::Distributed { .. } => build_dns_ws_transport(noise_config, node_type.wss_certs()),
NetworkInfo::InMemory => build_memory_transport(noise_config, config.max_num_streams),
NetworkInfo::Distributed { .. } => {
build_dns_ws_transport(noise_config, config.node_type.wss_certs(), config.max_num_streams)
},
};

let (cmd_tx, cmd_rx) = channel(CHANNEL_BUF_SIZE);
let (event_tx, event_rx) = channel(CHANNEL_BUF_SIZE);

let bootstrap = to_dial
let bootstrap = config
.to_dial
.into_iter()
.map(|addr| addr.try_to_multiaddr(network_info))
.collect::<Result<Vec<Multiaddr>, _>>()?;
Expand Down Expand Up @@ -654,13 +653,13 @@ fn start_gossipsub(
let mut gossipsub = Gossipsub::new(MessageAuthenticity::Author(local_peer_id), gossipsub_config)
.map_err(|e| AdexBehaviourError::InitializationError(e.to_owned()))?;

let floodsub = Floodsub::new(local_peer_id, netid != DEFAULT_NETID);
let floodsub = Floodsub::new(local_peer_id, config.netid != DEFAULT_NETID);

let mut peers_exchange = PeersExchange::new(network_info);
if !network_info.in_memory() {
// Please note WASM nodes don't support `PeersExchange` currently,
// so `get_all_network_seednodes` returns an empty list.
for (peer_id, addr, _domain) in get_all_network_seednodes(netid) {
for (peer_id, addr, _domain) in get_all_network_seednodes(config.netid) {
let multiaddr = addr.try_to_multiaddr(network_info)?;
peers_exchange.add_peer_addresses_to_known_peers(&peer_id, iter::once(multiaddr).collect());
if peer_id != local_peer_id {
Expand All @@ -686,12 +685,13 @@ fn start_gossipsub(
let adex_behavior = AtomicDexBehaviour {
core: core_behaviour,
event_tx,
runtime: runtime.clone(),
runtime: config.runtime.clone(),
cmd_rx,
netid,
netid: config.netid,
};

libp2p::swarm::SwarmBuilder::with_executor(transport, adex_behavior, local_peer_id, runtime.clone()).build()
libp2p::swarm::SwarmBuilder::with_executor(transport, adex_behavior, local_peer_id, config.runtime.clone())
.build()
};

swarm
Expand All @@ -700,7 +700,7 @@ fn start_gossipsub(
.floodsub
.subscribe(FloodsubTopic::new(PEERS_TOPIC.to_owned()));

match node_type {
match config.node_type {
NodeType::Relay {
ip,
network_ports,
Expand Down Expand Up @@ -793,7 +793,7 @@ fn start_gossipsub(
Poll::Pending
});

runtime.spawn(polling_fut.then(|_| futures::future::ready(())));
config.runtime.spawn(polling_fut.then(|_| futures::future::ready(())));
Ok((cmd_tx, event_rx, local_peer_id))
}

Expand Down Expand Up @@ -886,16 +886,19 @@ fn announce_my_addresses(swarm: &mut AtomicDexSwarm) {
fn build_dns_ws_transport(
noise_keys: noise::Config,
_wss_certs: Option<&WssCerts>,
max_num_streams: usize,
) -> BoxedTransport<(PeerId, libp2p::core::muxing::StreamMuxerBox)> {
let websocket = libp2p::wasm_ext::ffi::websocket_transport();
let transport = libp2p::wasm_ext::ExtTransport::new(websocket);
upgrade_transport(transport, noise_keys)

upgrade_transport(transport, noise_keys, max_num_streams)
}

#[cfg(not(target_arch = "wasm32"))]
fn build_dns_ws_transport(
noise_keys: noise::Config,
wss_certs: Option<&WssCerts>,
max_num_streams: usize,
) -> BoxedTransport<(PeerId, libp2p::core::muxing::StreamMuxerBox)> {
use libp2p::websocket::tls as libp2p_tls;

Expand Down Expand Up @@ -928,18 +931,22 @@ fn build_dns_ws_transport(
.unwrap();

let transport = dns_tcp.or_transport(ws_dns_tcp);
upgrade_transport(transport, noise_keys)
upgrade_transport(transport, noise_keys, max_num_streams)
}

fn build_memory_transport(noise_keys: noise::Config) -> BoxedTransport<(PeerId, libp2p::core::muxing::StreamMuxerBox)> {
fn build_memory_transport(
noise_keys: noise::Config,
max_num_streams: usize,
) -> BoxedTransport<(PeerId, libp2p::core::muxing::StreamMuxerBox)> {
let transport = libp2p::core::transport::MemoryTransport::default();
upgrade_transport(transport, noise_keys)
upgrade_transport(transport, noise_keys, max_num_streams)
}

/// Set up an encrypted Transport over the Mplex protocol.
fn upgrade_transport<T>(
transport: T,
noise_config: noise::Config,
max_num_streams: usize,
) -> BoxedTransport<(PeerId, libp2p::core::muxing::StreamMuxerBox)>
where
T: Transport + Send + Sync + 'static + std::marker::Unpin,
Expand All @@ -948,10 +955,13 @@ where
T::Dial: Send,
T::Error: Send + Sync + 'static,
{
let mut yamux_cfg = libp2p::yamux::Config::default();
yamux_cfg.set_max_num_streams(max_num_streams);

transport
.upgrade(libp2p::core::upgrade::Version::V1)
.authenticate(noise_config)
.multiplex(libp2p::yamux::Config::default())
.multiplex(yamux_cfg)
.timeout(std::time::Duration::from_secs(20))
.map(|(peer, muxer), _| (peer, libp2p::core::muxing::StreamMuxerBox::new(muxer)))
.boxed()
Expand Down Expand Up @@ -1024,24 +1034,69 @@ impl NetworkBehaviour for AtomicDexBehaviour {
}
}

pub struct GossipsubConfig {
netid: u16,
force_key: Option<[u8; 32]>,
runtime: SwarmRuntime,
to_dial: Vec<RelayAddress>,
node_type: NodeType,
max_num_streams: usize,
}

impl GossipsubConfig {
#[cfg(test)]
pub(crate) fn new_for_tests(runtime: SwarmRuntime, to_dial: Vec<RelayAddress>, node_type: NodeType) -> Self {
GossipsubConfig {
netid: 333,
force_key: None,
runtime,
to_dial,
node_type,
max_num_streams: 128,
}
}

pub fn new(netid: u16, runtime: SwarmRuntime, node_type: NodeType) -> Self {
GossipsubConfig {
netid,
force_key: None,
runtime,
to_dial: vec![],
node_type,
max_num_streams: 512,
}
}

pub fn to_dial(&mut self, to_dial: Vec<RelayAddress>) -> &mut Self {
self.to_dial = to_dial;
self
}

pub fn force_key(&mut self, force_key: Option<[u8; 32]>) -> &mut Self {
self.force_key = force_key;
self
}

pub fn max_num_streams(&mut self, max_num_streams: usize) -> &mut Self {
self.max_num_streams = max_num_streams;
self
}
}

/// Creates and spawns new AdexBehaviour Swarm returning:
/// 1. tx to send control commands
/// 2. rx emitting gossip events to processing side
/// 3. our peer_id
/// 4. abort handle to stop the P2P processing fut.
pub async fn spawn_gossipsub(
netid: u16,
force_key: Option<[u8; 32]>,
runtime: SwarmRuntime,
to_dial: Vec<RelayAddress>,
node_type: NodeType,
config: GossipsubConfig,
on_poll: impl Fn(&AtomicDexSwarm) + Send + 'static,
) -> Result<(Sender<AdexBehaviourCmd>, AdexEventRx, PeerId), AdexBehaviourError> {
let (result_tx, result_rx) = oneshot::channel();

let runtime_c = runtime.clone();
let runtime_c = config.runtime.clone();
let fut = async move {
let result = start_gossipsub(netid, force_key, runtime, to_dial, node_type, on_poll);
let result = start_gossipsub(config, on_poll);
result_tx.send(result).unwrap();
};

Expand Down
10 changes: 7 additions & 3 deletions mm2src/mm2_p2p/src/behaviours/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ mod tests {
use crate::{spawn_gossipsub, AdexBehaviourCmd, AdexBehaviourEvent, AdexResponse, AdexResponseChannel, NetworkInfo,
NetworkPorts, NodeType, RelayAddress, RequestResponseBehaviourEvent, SwarmRuntime};

use super::atomicdex::GossipsubConfig;

static TEST_LISTEN_PORT: AtomicU64 = AtomicU64::new(1);

lazy_static! {
Expand All @@ -44,9 +46,11 @@ mod tests {
let spawner = SwarmRuntime::new(SYSTEM.weak_spawner());
let node_type = NodeType::RelayInMemory { port };
let seednodes = seednodes.into_iter().map(RelayAddress::Memory).collect();
let (cmd_tx, mut event_rx, peer_id) = spawn_gossipsub(333, None, spawner, seednodes, node_type, |_| {})
.await
.expect("Error spawning AdexBehaviour");

let (cmd_tx, mut event_rx, peer_id) =
spawn_gossipsub(GossipsubConfig::new_for_tests(spawner, seednodes, node_type), |_| {})
.await
.expect("Error spawning AdexBehaviour");

// spawn a response future
let cmd_tx_fut = cmd_tx.clone();
Expand Down