diff --git a/Cargo.lock b/Cargo.lock index 27a3eee163..d9b14e57b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4451,7 +4451,6 @@ dependencies = [ "futures 0.3.28", "futures-rustls 0.21.1", "gstuff", - "h2", "hash-db", "hash256-std-hasher", "hex 0.4.3", diff --git a/mm2src/mm2_main/src/lp_native_dex.rs b/mm2src/mm2_main/src/lp_native_dex.rs index 22a6cd3d4e..b5e9ef131a 100644 --- a/mm2src/mm2_main/src/lp_native_dex.rs +++ b/mm2src/mm2_main/src/lp_native_dex.rs @@ -29,7 +29,7 @@ use mm2_core::mm_ctx::{MmArc, MmCtx}; use mm2_err_handle::common_errors::InternalError; use mm2_err_handle::prelude::*; use mm2_event_stream::behaviour::{EventBehaviour, EventInitStatus}; -use mm2_libp2p::behaviours::atomicdex::DEPRECATED_NETID_LIST; +use mm2_libp2p::behaviours::atomicdex::{GossipsubConfig, DEPRECATED_NETID_LIST}; use mm2_libp2p::{spawn_gossipsub, AdexBehaviourError, NodeType, RelayAddress, RelayAddressError, SeedNodeInfo, SwarmRuntime, WssCerts}; use mm2_metrics::mm_gauge; @@ -37,11 +37,12 @@ use mm2_net::network_event::NetworkEvent; use mm2_net::p2p::P2PContext; use rpc_task::RpcTaskError; use serde_json::{self as json}; -use std::fs; +use std::convert::TryInto; use std::io; use std::path::PathBuf; use std::str; use std::time::Duration; +use std::{fs, usize}; #[cfg(not(target_arch = "wasm32"))] use crate::mm2::database::init_and_migrate_sql_db; @@ -587,7 +588,18 @@ pub async fn init_p2p(ctx: MmArc) -> P2PResult<()> { }; let spawner = SwarmRuntime::new(ctx.spawner()); - let spawn_result = spawn_gossipsub(netid, force_p2p_key, spawner, seednodes, node_type, move |swarm| { + let max_num_streams: usize = ctx.conf["max_concurrent_connections"] + .as_u64() + .unwrap_or(512) + .try_into() + .unwrap_or(usize::MAX); + + let mut gossipsub_config = GossipsubConfig::new(netid, spawner, node_type); + gossipsub_config.to_dial(seednodes); + gossipsub_config.force_key(force_p2p_key); + gossipsub_config.max_num_streams(max_num_streams); + + let spawn_result = spawn_gossipsub(gossipsub_config, move |swarm| { let behaviour = swarm.behaviour(); mm_gauge!( ctx_on_poll.metrics, diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index ca54fe53e1..34b6a6366c 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -591,33 +591,32 @@ type AtomicDexSwarm = Swarm; /// `panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime'`. #[allow(clippy::too_many_arguments)] fn start_gossipsub( - netid: u16, - force_key: Option<[u8; 32]>, - runtime: SwarmRuntime, - to_dial: Vec, - node_type: NodeType, + config: GossipsubConfig, on_poll: impl Fn(&AtomicDexSwarm) + Send + 'static, ) -> Result<(Sender, AdexEventRx, PeerId), AdexBehaviourError> { - let i_am_relay = node_type.is_relay(); + let i_am_relay = config.node_type.is_relay(); let mut rng = rand::thread_rng(); - let local_key = generate_ed25519_keypair(&mut rng, force_key); + let local_key = generate_ed25519_keypair(&mut rng, config.force_key); let local_peer_id = PeerId::from(local_key.public()); info!("Local peer id: {:?}", local_peer_id); let noise_config = noise::Config::new(&local_key).expect("Signing libp2p-noise static DH keypair failed."); - let network_info = node_type.to_network_info(); + let network_info = config.node_type.to_network_info(); info!("Network information: {:?}", network_info); let transport = match network_info { - NetworkInfo::InMemory => build_memory_transport(noise_config), - NetworkInfo::Distributed { .. } => build_dns_ws_transport(noise_config, node_type.wss_certs()), + NetworkInfo::InMemory => build_memory_transport(noise_config, config.max_num_streams), + NetworkInfo::Distributed { .. } => { + build_dns_ws_transport(noise_config, config.node_type.wss_certs(), config.max_num_streams) + }, }; let (cmd_tx, cmd_rx) = channel(CHANNEL_BUF_SIZE); let (event_tx, event_rx) = channel(CHANNEL_BUF_SIZE); - let bootstrap = to_dial + let bootstrap = config + .to_dial .into_iter() .map(|addr| addr.try_to_multiaddr(network_info)) .collect::, _>>()?; @@ -654,13 +653,13 @@ fn start_gossipsub( let mut gossipsub = Gossipsub::new(MessageAuthenticity::Author(local_peer_id), gossipsub_config) .map_err(|e| AdexBehaviourError::InitializationError(e.to_owned()))?; - let floodsub = Floodsub::new(local_peer_id, netid != DEFAULT_NETID); + let floodsub = Floodsub::new(local_peer_id, config.netid != DEFAULT_NETID); let mut peers_exchange = PeersExchange::new(network_info); if !network_info.in_memory() { // Please note WASM nodes don't support `PeersExchange` currently, // so `get_all_network_seednodes` returns an empty list. - for (peer_id, addr, _domain) in get_all_network_seednodes(netid) { + for (peer_id, addr, _domain) in get_all_network_seednodes(config.netid) { let multiaddr = addr.try_to_multiaddr(network_info)?; peers_exchange.add_peer_addresses_to_known_peers(&peer_id, iter::once(multiaddr).collect()); if peer_id != local_peer_id { @@ -686,12 +685,13 @@ fn start_gossipsub( let adex_behavior = AtomicDexBehaviour { core: core_behaviour, event_tx, - runtime: runtime.clone(), + runtime: config.runtime.clone(), cmd_rx, - netid, + netid: config.netid, }; - libp2p::swarm::SwarmBuilder::with_executor(transport, adex_behavior, local_peer_id, runtime.clone()).build() + libp2p::swarm::SwarmBuilder::with_executor(transport, adex_behavior, local_peer_id, config.runtime.clone()) + .build() }; swarm @@ -700,7 +700,7 @@ fn start_gossipsub( .floodsub .subscribe(FloodsubTopic::new(PEERS_TOPIC.to_owned())); - match node_type { + match config.node_type { NodeType::Relay { ip, network_ports, @@ -793,7 +793,7 @@ fn start_gossipsub( Poll::Pending }); - runtime.spawn(polling_fut.then(|_| futures::future::ready(()))); + config.runtime.spawn(polling_fut.then(|_| futures::future::ready(()))); Ok((cmd_tx, event_rx, local_peer_id)) } @@ -886,16 +886,19 @@ fn announce_my_addresses(swarm: &mut AtomicDexSwarm) { fn build_dns_ws_transport( noise_keys: noise::Config, _wss_certs: Option<&WssCerts>, + max_num_streams: usize, ) -> BoxedTransport<(PeerId, libp2p::core::muxing::StreamMuxerBox)> { let websocket = libp2p::wasm_ext::ffi::websocket_transport(); let transport = libp2p::wasm_ext::ExtTransport::new(websocket); - upgrade_transport(transport, noise_keys) + + upgrade_transport(transport, noise_keys, max_num_streams) } #[cfg(not(target_arch = "wasm32"))] fn build_dns_ws_transport( noise_keys: noise::Config, wss_certs: Option<&WssCerts>, + max_num_streams: usize, ) -> BoxedTransport<(PeerId, libp2p::core::muxing::StreamMuxerBox)> { use libp2p::websocket::tls as libp2p_tls; @@ -928,18 +931,22 @@ fn build_dns_ws_transport( .unwrap(); let transport = dns_tcp.or_transport(ws_dns_tcp); - upgrade_transport(transport, noise_keys) + upgrade_transport(transport, noise_keys, max_num_streams) } -fn build_memory_transport(noise_keys: noise::Config) -> BoxedTransport<(PeerId, libp2p::core::muxing::StreamMuxerBox)> { +fn build_memory_transport( + noise_keys: noise::Config, + max_num_streams: usize, +) -> BoxedTransport<(PeerId, libp2p::core::muxing::StreamMuxerBox)> { let transport = libp2p::core::transport::MemoryTransport::default(); - upgrade_transport(transport, noise_keys) + upgrade_transport(transport, noise_keys, max_num_streams) } /// Set up an encrypted Transport over the Mplex protocol. fn upgrade_transport( transport: T, noise_config: noise::Config, + max_num_streams: usize, ) -> BoxedTransport<(PeerId, libp2p::core::muxing::StreamMuxerBox)> where T: Transport + Send + Sync + 'static + std::marker::Unpin, @@ -948,10 +955,13 @@ where T::Dial: Send, T::Error: Send + Sync + 'static, { + let mut yamux_cfg = libp2p::yamux::Config::default(); + yamux_cfg.set_max_num_streams(max_num_streams); + transport .upgrade(libp2p::core::upgrade::Version::V1) .authenticate(noise_config) - .multiplex(libp2p::yamux::Config::default()) + .multiplex(yamux_cfg) .timeout(std::time::Duration::from_secs(20)) .map(|(peer, muxer), _| (peer, libp2p::core::muxing::StreamMuxerBox::new(muxer))) .boxed() @@ -1024,24 +1034,69 @@ impl NetworkBehaviour for AtomicDexBehaviour { } } +pub struct GossipsubConfig { + netid: u16, + force_key: Option<[u8; 32]>, + runtime: SwarmRuntime, + to_dial: Vec, + node_type: NodeType, + max_num_streams: usize, +} + +impl GossipsubConfig { + #[cfg(test)] + pub(crate) fn new_for_tests(runtime: SwarmRuntime, to_dial: Vec, node_type: NodeType) -> Self { + GossipsubConfig { + netid: 333, + force_key: None, + runtime, + to_dial, + node_type, + max_num_streams: 128, + } + } + + pub fn new(netid: u16, runtime: SwarmRuntime, node_type: NodeType) -> Self { + GossipsubConfig { + netid, + force_key: None, + runtime, + to_dial: vec![], + node_type, + max_num_streams: 512, + } + } + + pub fn to_dial(&mut self, to_dial: Vec) -> &mut Self { + self.to_dial = to_dial; + self + } + + pub fn force_key(&mut self, force_key: Option<[u8; 32]>) -> &mut Self { + self.force_key = force_key; + self + } + + pub fn max_num_streams(&mut self, max_num_streams: usize) -> &mut Self { + self.max_num_streams = max_num_streams; + self + } +} + /// Creates and spawns new AdexBehaviour Swarm returning: /// 1. tx to send control commands /// 2. rx emitting gossip events to processing side /// 3. our peer_id /// 4. abort handle to stop the P2P processing fut. pub async fn spawn_gossipsub( - netid: u16, - force_key: Option<[u8; 32]>, - runtime: SwarmRuntime, - to_dial: Vec, - node_type: NodeType, + config: GossipsubConfig, on_poll: impl Fn(&AtomicDexSwarm) + Send + 'static, ) -> Result<(Sender, AdexEventRx, PeerId), AdexBehaviourError> { let (result_tx, result_rx) = oneshot::channel(); - let runtime_c = runtime.clone(); + let runtime_c = config.runtime.clone(); let fut = async move { - let result = start_gossipsub(netid, force_key, runtime, to_dial, node_type, on_poll); + let result = start_gossipsub(config, on_poll); result_tx.send(result).unwrap(); }; diff --git a/mm2src/mm2_p2p/src/behaviours/mod.rs b/mm2src/mm2_p2p/src/behaviours/mod.rs index 0af8fd71c0..cdfda38c8d 100644 --- a/mm2src/mm2_p2p/src/behaviours/mod.rs +++ b/mm2src/mm2_p2p/src/behaviours/mod.rs @@ -23,6 +23,8 @@ mod tests { use crate::{spawn_gossipsub, AdexBehaviourCmd, AdexBehaviourEvent, AdexResponse, AdexResponseChannel, NetworkInfo, NetworkPorts, NodeType, RelayAddress, RequestResponseBehaviourEvent, SwarmRuntime}; + use super::atomicdex::GossipsubConfig; + static TEST_LISTEN_PORT: AtomicU64 = AtomicU64::new(1); lazy_static! { @@ -44,9 +46,11 @@ mod tests { let spawner = SwarmRuntime::new(SYSTEM.weak_spawner()); let node_type = NodeType::RelayInMemory { port }; let seednodes = seednodes.into_iter().map(RelayAddress::Memory).collect(); - let (cmd_tx, mut event_rx, peer_id) = spawn_gossipsub(333, None, spawner, seednodes, node_type, |_| {}) - .await - .expect("Error spawning AdexBehaviour"); + + let (cmd_tx, mut event_rx, peer_id) = + spawn_gossipsub(GossipsubConfig::new_for_tests(spawner, seednodes, node_type), |_| {}) + .await + .expect("Error spawning AdexBehaviour"); // spawn a response future let cmd_tx_fut = cmd_tx.clone();