diff --git a/src/network.rs b/src/network.rs index 02aae27..45ee365 100644 --- a/src/network.rs +++ b/src/network.rs @@ -108,7 +108,7 @@ pub fn is_dht_participant(user_agent: &str) -> bool { } /// Capacity of the internal channel used by the message receiving system. -pub(crate) const MESSAGE_RECV_CHANNEL_CAPACITY: usize = 256; +pub(crate) const MESSAGE_RECV_CHANNEL_CAPACITY: usize = 2048; /// Maximum number of concurrent in-flight request/response operations. pub(crate) const MAX_ACTIVE_REQUESTS: usize = 256; diff --git a/src/transport_handle.rs b/src/transport_handle.rs index 0447b63..020bea2 100644 --- a/src/transport_handle.rs +++ b/src/transport_handle.rs @@ -1722,18 +1722,22 @@ impl TransportHandle { if let Some(ref app_id) = authenticated_node_id && *app_id != self_peer_id { - // Hold `peer_to_channel` across the transport-level - // `register_connection_peer_id` call so the app-level - // map and the transport's internal addr→peer map are - // consistent for any concurrent reader. Without this, - // there is a microsecond window in which a hole-punch - // lookup that depends on the registration can fail - // spuriously even though the app-level map already - // says the peer is known. - let mut p2c = peer_to_channel.write().await; - let is_new_peer = !p2c.contains_key(app_id); - let channels = p2c.entry(*app_id).or_default(); - let inserted = channels.insert(channel_id.clone()); + // Update app-level peer↔channel mappings under write locks, + // then drop all locks before the async transport registration. + // This trades a microsecond consistency window (hole-punch + // lookups may briefly miss a peer that the app-level map + // already knows) for dramatically better throughput under + // load — the previous pattern held a write lock across an + // async call, blocking all 8 shard consumers on every + // authenticated message. + let (is_new_peer, inserted) = { + let mut p2c = peer_to_channel.write().await; + let is_new = !p2c.contains_key(app_id); + let channels = p2c.entry(*app_id).or_default(); + let ins = channels.insert(channel_id.clone()); + (is_new, ins) + }; // p2c lock dropped here + if inserted { channel_to_peers .write() @@ -1743,17 +1747,12 @@ impl TransportHandle { .insert(*app_id); } - // Register peer ID at the low-level transport - // endpoint so PUNCH_ME_NOW relay can find this - // peer by identity instead of socket address. + // Register peer ID at the low-level transport endpoint. + // Now runs without holding any write locks. dual_node_for_peer_reg .register_connection_peer_id(from_addr, *app_id.to_bytes()) .await; - // Drop the lock before emitting events so subscribers - // that re-enter the registry don't deadlock. - drop(p2c); - if is_new_peer { peer_user_agents .write() @@ -1853,7 +1852,7 @@ const MESSAGE_DISPATCH_SHARDS: usize = 8; /// MESSAGE_DISPATCH_SHARDS`, but when that division rounds to something /// too small for healthy bursts we floor it at this value so each shard /// retains a reasonable amount of buffering headroom. -const MIN_SHARD_CHANNEL_CAPACITY: usize = 16; +const MIN_SHARD_CHANNEL_CAPACITY: usize = 128; /// Log a warning every Nth dropped message in the dispatcher. ///