diff --git a/src/dht/core_engine.rs b/src/dht/core_engine.rs index c7b76c4..300b3da 100644 --- a/src/dht/core_engine.rs +++ b/src/dht/core_engine.rs @@ -7,6 +7,7 @@ use crate::PeerId; use crate::address::MultiAddr; use crate::security::{IP_EXACT_LIMIT, IPDiversityConfig, canonicalize_ip, ip_subnet_limit}; use anyhow::{Result, anyhow}; +use parking_lot::Mutex as PlMutex; use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; @@ -15,6 +16,71 @@ use std::time::{Duration, Instant}; use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; +/// An [`Instant`] stored behind a synchronous mutex so it can be updated +/// from `&self` receivers. +/// +/// The key property: reads and writes only need `&self`, so the routing +/// table's hot touch path (called on every inbound DHT message) can run +/// under a read lock on the routing table instead of an exclusive write +/// lock. The previous write-lock design serialised all readers behind +/// every touch, which at 1000 nodes became the dominant contention point. +/// +/// Why a mutex instead of an atomic: `Instant` is opaque (no stable `u64` +/// representation) and can legitimately represent times in the past +/// (tests backdate `last_seen` to mark peers stale). Any epoch-based +/// `AtomicU64` encoding would have to either (a) panic/saturate on past +/// times, or (b) pick a process-lifetime epoch in the deep past, which +/// risks `Instant` underflow on recently booted systems. A +/// [`parking_lot::Mutex`] sidesteps all of this and is still +/// extremely fast on the uncontended path (single CAS to acquire + store +/// + single CAS to release — microseconds). +#[derive(Debug)] +pub struct AtomicInstant(PlMutex); + +impl AtomicInstant { + /// Return a fresh `AtomicInstant` set to the current time. + pub fn now() -> Self { + Self(PlMutex::new(Instant::now())) + } + + /// Wrap an existing `Instant`. + pub fn from_instant(i: Instant) -> Self { + Self(PlMutex::new(i)) + } + + /// Load the current value as an `Instant`. + pub fn load(&self) -> Instant { + *self.0.lock() + } + + /// Atomically store the current time. + pub fn store_now(&self) { + *self.0.lock() = Instant::now(); + } + + /// Atomically store a specific `Instant`. + pub fn store(&self, i: Instant) { + *self.0.lock() = i; + } + + /// Time elapsed since the stored instant. + pub fn elapsed(&self) -> Duration { + self.load().elapsed() + } +} + +impl Clone for AtomicInstant { + fn clone(&self) -> Self { + Self(PlMutex::new(*self.0.lock())) + } +} + +impl Default for AtomicInstant { + fn default() -> Self { + Self::now() + } +} + #[cfg(test)] use crate::adaptive::trust::DEFAULT_NEUTRAL_TRUST; @@ -82,10 +148,13 @@ pub struct NodeInfo { #[serde(default)] pub address_types: Vec, /// Monotonic timestamp of last successful interaction. - /// Uses `Instant` to avoid NTP clock-jump issues. Skipped during + /// + /// Stored as an [`AtomicInstant`] so the routing table's touch path + /// can update it under a read lock, not a write lock. Uses `Instant` + /// under the hood to avoid NTP clock-jump issues. Skipped during /// serialization — deserialized `NodeInfo` defaults to "just seen." - #[serde(skip, default = "Instant::now")] - pub last_seen: Instant, + #[serde(skip, default = "AtomicInstant::now")] + pub last_seen: AtomicInstant, } impl NodeInfo { @@ -233,7 +302,7 @@ impl KBucket { // the parallel address_types vec stays in sync. if let Some(pos) = self.nodes.iter().position(|n| n.id == node.id) { let mut existing = self.nodes.remove(pos); - existing.last_seen = node.last_seen; + existing.last_seen.store(node.last_seen.load()); for (i, addr) in node.addresses.into_iter().enumerate() { let addr_type = node .address_types @@ -264,10 +333,12 @@ impl KBucket { self.nodes.retain(|n| &n.id != node_id); } - /// Update `last_seen` (and optionally merge an address) for a node, then - /// Update `last_seen` (and optionally merge a typed address) for a node, - /// then move it to the tail of the bucket (most recently seen) per Kademlia - /// protocol. + /// Slow path: update `last_seen`, merge an address, and reorder the + /// bucket so the touched node becomes the most-recently-seen entry. + /// + /// Takes `&mut self` because merging an address may mutate the node's + /// address list. For the fast path (just bumping the timestamp when no + /// address merge is needed) see [`Self::touch_last_seen_if_merge_noop`]. fn touch_node_typed( &mut self, node_id: &PeerId, @@ -275,7 +346,7 @@ impl KBucket { addr_type: AddressType, ) -> bool { if let Some(pos) = self.nodes.iter().position(|n| &n.id == node_id) { - self.nodes[pos].last_seen = Instant::now(); + self.nodes[pos].last_seen.store_now(); if let Some(addr) = address { // Loopback injection prevention (Design Section 6.3 rule 4): let addr_is_loopback = addr @@ -298,6 +369,70 @@ impl KBucket { } } + /// Fast path: if `node_id` is in this bucket AND the optional address + /// merge would be a no-op (address is `None`, address is already + /// present **with the same `addr_type`**, or the loopback-injection + /// rule would skip the merge), atomically bump `last_seen` in place + /// and return `Some(true)`. + /// + /// Returns: + /// - `Some(true)` — fast path succeeded, `last_seen` updated. + /// - `Some(false)` — node is not in this bucket. + /// - `None` — the address is either not yet present, or present with + /// a *different* type classification (e.g. learned as `Direct`, + /// now being promoted to `Relay`). The slow path must run so + /// [`merge_typed_address`] can re-insert at the type-priority + /// position. Without this guard the relay-promotion path in the + /// network bridge silently degrades to a `last_seen` bump and the + /// address ordering invariant is broken. + /// + /// Only requires `&self` — no bucket mutation, just an atomic store on + /// [`NodeInfo::last_seen`]. This lets the hot touch path (called on + /// every inbound DHT message) run under a read lock on the routing + /// table instead of an exclusive write lock. + fn touch_last_seen_if_merge_noop( + &self, + node_id: &PeerId, + address: Option<&MultiAddr>, + addr_type: AddressType, + ) -> Option { + let Some(pos) = self.nodes.iter().position(|n| &n.id == node_id) else { + return Some(false); + }; + let node = &self.nodes[pos]; + let merge_is_noop = match address { + None => true, + Some(addr) => { + // Already in the list → merge would reinsert at the same + // position, which is a no-op only if the existing entry + // has the same type classification. If the type differs + // we MUST escalate to the slow path so merge_typed_address + // can re-order by type priority. + if let Some(existing_pos) = node.addresses.iter().position(|a| a == addr) { + node.address_type_at(existing_pos) == addr_type + } else { + // Loopback-injection skip: if the candidate is + // loopback and the node already has a non-loopback + // address, the slow path would skip the merge entirely. + let addr_is_loopback = addr + .ip() + .is_some_and(|ip| canonicalize_ip(ip).is_loopback()); + let node_has_non_loopback = node + .addresses + .iter() + .any(|a| a.ip().is_some_and(|ip| !canonicalize_ip(ip).is_loopback())); + addr_is_loopback && node_has_non_loopback + } + } + }; + if merge_is_noop { + node.last_seen.store_now(); + Some(true) + } else { + None + } + } + fn get_nodes(&self) -> &[NodeInfo] { &self.nodes } @@ -352,6 +487,28 @@ impl KademliaRoutingTable { } } + /// Fast path for the touch operation. + /// + /// Returns: + /// - `Some(true)` — node found and `last_seen` updated atomically. + /// - `Some(false)` — node is not in the routing table (fast-path result + /// is authoritative; no fallback needed). + /// - `None` — node is present but the address merge would not be a + /// no-op (either the address is missing, or its type classification + /// differs from `addr_type`); the caller must escalate to + /// [`Self::touch_node`] under a write lock. + /// + /// Only takes `&self` so this can run under a `RwLock::read()` guard. + fn try_touch_last_seen( + &self, + node_id: &PeerId, + address: Option<&MultiAddr>, + addr_type: AddressType, + ) -> Option { + let bucket_index = self.get_bucket_index(node_id)?; + self.buckets[bucket_index].touch_last_seen_if_merge_noop(node_id, address, addr_type) + } + fn find_closest_nodes(&self, key: &DhtKey, count: usize) -> Vec { // Collect ALL entries from every bucket. Bucket index correlates with // distance from *self*, not from key K — peers in distant buckets can @@ -760,7 +917,7 @@ impl DhtCoreEngine { id: self.node_id, addresses: vec![], address_types: vec![], - last_seen: Instant::now(), + last_seen: AtomicInstant::now(), }; let self_dist = xor_distance_bytes(self.node_id.to_bytes(), key.as_bytes()); @@ -838,12 +995,45 @@ impl DhtCoreEngine { routing.touch_node(node_id, address, AddressType::Direct) } + /// Touch a peer's routing-table entry with an optional typed address. + /// + /// **Fast path (read lock + atomic store):** If the peer is in the + /// routing table and the address merge would be a no-op (address is + /// `None`, or it's already in the peer's list, or the loopback rule + /// would skip it), this updates `last_seen` atomically under a read + /// lock with no bucket mutation. + /// + /// **Slow path (write lock):** If an actual address merge is needed, + /// the method escalates to a write lock and uses the full + /// `touch_node` flow. + /// + /// This split removes the write lock from the common hot path — at + /// 1000 nodes the touch is called on every inbound DHT message, and + /// the write-lock version was the dominant contention point on the + /// routing table. pub async fn touch_node_typed( &self, node_id: &PeerId, address: Option<&MultiAddr>, addr_type: AddressType, ) -> bool { + // Fast path: read lock + atomic last_seen store. The fast path + // ALSO requires the address (if any) to already be present with + // the same type classification — see `touch_last_seen_if_merge_noop`. + // Promotion of an existing address from one classification to + // another (e.g. Direct → Relay) is intentionally pushed to the + // slow path so the bucket-level `merge_typed_address` can re-order. + { + let routing = self.routing_table.read().await; + match routing.try_touch_last_seen(node_id, address, addr_type) { + Some(true) => return true, + Some(false) => return false, + // Merge is non-trivial — fall through to the write-lock path. + None => {} + } + } + + // Slow path: address merge or re-classification needed, take write lock. let mut routing = self.routing_table.write().await; routing.touch_node(node_id, address, addr_type) } @@ -1011,13 +1201,13 @@ impl DhtCoreEngine { if matched_ip { count_ip += 1; if farthest_ip.as_ref().is_none_or(|(_, d, _)| dist > *d) { - farthest_ip = Some((n.id, dist, n.last_seen)); + farthest_ip = Some((n.id, dist, n.last_seen.load())); } } if matched_subnet { count_subnet += 1; if farthest_subnet.as_ref().is_none_or(|(_, d, _)| dist > *d) { - farthest_subnet = Some((n.id, dist, n.last_seen)); + farthest_subnet = Some((n.id, dist, n.last_seen.load())); } } } @@ -1088,13 +1278,13 @@ impl DhtCoreEngine { if matched_ip { count_ip += 1; if farthest_ip.as_ref().is_none_or(|(_, d, _)| dist > *d) { - farthest_ip = Some((n.id, dist, n.last_seen)); + farthest_ip = Some((n.id, dist, n.last_seen.load())); } } if matched_subnet { count_subnet += 1; if farthest_subnet.as_ref().is_none_or(|(_, d, _)| dist > *d) { - farthest_subnet = Some((n.id, dist, n.last_seen)); + farthest_subnet = Some((n.id, dist, n.last_seen.load())); } } } @@ -1234,7 +1424,7 @@ impl DhtCoreEngine { .position(|n| n.id == node.id) { let existing = &mut routing.buckets[bucket_idx].nodes[pos]; - existing.last_seen = Instant::now(); + existing.last_seen.store_now(); // Merge each address from the candidate, respecting loopback injection prevention for addr in &node.addresses { let addr_is_loopback = addr @@ -1531,7 +1721,7 @@ mod tests { id: PeerId::from_bytes([byte; 32]), addresses: vec![address.parse::().unwrap()], address_types: vec![AddressType::Direct], - last_seen: Instant::now(), + last_seen: AtomicInstant::now(), } } @@ -1633,7 +1823,7 @@ mod tests { .add_node(NodeInfo { id: PeerId::from_bytes(id_bytes), addresses: vec!["/ip4/10.0.0.1/udp/9000/quic".parse().unwrap()], - last_seen: Instant::now(), + last_seen: AtomicInstant::now(), address_types: vec![], }) .unwrap(); @@ -1644,7 +1834,7 @@ mod tests { .add_node(NodeInfo { id: PeerId::from_bytes(id_bytes), addresses: vec!["/ip4/10.0.0.2/udp/9000/quic".parse().unwrap()], - last_seen: Instant::now(), + last_seen: AtomicInstant::now(), address_types: vec![], }) .unwrap(); @@ -1677,7 +1867,7 @@ mod tests { .add_node(NodeInfo { id: PeerId::from_bytes(id_bytes), addresses: vec!["/ip4/10.0.0.1/udp/9000/quic".parse().unwrap()], - last_seen: Instant::now(), + last_seen: AtomicInstant::now(), address_types: vec![], }) .unwrap(); @@ -1688,7 +1878,7 @@ mod tests { .add_node(NodeInfo { id: PeerId::from_bytes(id_bytes), addresses: vec!["/ip4/10.0.0.2/udp/9000/quic".parse().unwrap()], - last_seen: Instant::now(), + last_seen: AtomicInstant::now(), address_types: vec![], }) .unwrap(); @@ -1722,7 +1912,7 @@ mod tests { .parse() .unwrap(), ], - last_seen: Instant::now(), + last_seen: AtomicInstant::now(), address_types: vec![], }) .unwrap(); @@ -1832,7 +2022,7 @@ mod tests { let node = NodeInfo { id: PeerId::from_bytes(id), addresses: vec!["/ip4/203.0.113.1/udp/9000/quic".parse().unwrap()], - last_seen: Instant::now(), + last_seen: AtomicInstant::now(), address_types: vec![], }; let result = dht.add_node_no_trust(node).await; @@ -1854,7 +2044,7 @@ mod tests { let node = NodeInfo { id: PeerId::from_bytes([1u8; 32]), addresses: vec![], - last_seen: Instant::now(), + last_seen: AtomicInstant::now(), address_types: vec![], }; assert!(bucket.add_node(node).is_err()); @@ -1873,7 +2063,7 @@ mod tests { let node = NodeInfo { id: PeerId::from_bytes([1u8; 32]), addresses, - last_seen: Instant::now(), + last_seen: AtomicInstant::now(), address_types: vec![], }; bucket.add_node(node).unwrap(); @@ -1899,7 +2089,7 @@ mod tests { let replacement = NodeInfo { id: PeerId::from_bytes([1u8; 32]), addresses, - last_seen: Instant::now(), + last_seen: AtomicInstant::now(), address_types: vec![], }; bucket.add_node(replacement).unwrap(); @@ -1916,7 +2106,7 @@ mod tests { NodeInfo { id: PeerId::from_bytes(id_bytes), addresses: vec![address.parse::().unwrap()], - last_seen: Instant::now(), + last_seen: AtomicInstant::now(), address_types: vec![], } } @@ -1970,7 +2160,7 @@ mod tests { let updated = NodeInfo { id: peer_id, addresses: vec!["/ip4/10.0.0.2/udp/9000/quic".parse().unwrap()], - last_seen: Instant::now(), + last_seen: AtomicInstant::now(), address_types: vec![], }; let result = dht.add_node_no_trust(updated).await; @@ -2039,7 +2229,7 @@ mod tests { .find(|n| n.id == PeerId::from_bytes(id_far)) .unwrap(); // Set last_seen to exceed the test live threshold - node.last_seen = Instant::now() - TEST_STALE_AGE; + node.last_seen.store(Instant::now() - TEST_STALE_AGE); } // A closer candidate with the same IP @@ -2153,7 +2343,7 @@ mod tests { let node = NodeInfo { id: PeerId::from_bytes(id), addresses: vec!["/ip4/10.0.0.1/udp/9000/quic".parse().unwrap()], - last_seen: Instant::now(), + last_seen: AtomicInstant::now(), address_types: vec![], }; @@ -2210,7 +2400,7 @@ mod tests { .iter_mut() .find(|n| n.id == PeerId::from_bytes(id_far)) .unwrap(); - node.last_seen = Instant::now() - TEST_STALE_AGE; + node.last_seen.store(Instant::now() - TEST_STALE_AGE); } // A closer candidate with the same IP triggers swap @@ -2301,7 +2491,7 @@ mod tests { id_a[31] = 1; let bucket_idx = routing.get_bucket_index(&PeerId::from_bytes(id_a)).unwrap(); for node in &mut routing.buckets[bucket_idx].nodes { - node.last_seen = Instant::now() - TEST_STALE_AGE; + node.last_seen.store(Instant::now() - TEST_STALE_AGE); } } @@ -2488,7 +2678,7 @@ mod tests { id_a[31] = 1; let bucket_idx = routing.get_bucket_index(&PeerId::from_bytes(id_a)).unwrap(); for node in &mut routing.buckets[bucket_idx].nodes { - node.last_seen = Instant::now() - TEST_STALE_AGE; + node.last_seen.store(Instant::now() - TEST_STALE_AGE); } } @@ -2551,7 +2741,7 @@ mod tests { .iter_mut() .find(|n| n.id == PeerId::from_bytes(id_stale)) .unwrap(); - node.last_seen = Instant::now() - TEST_STALE_AGE; + node.last_seen.store(Instant::now() - TEST_STALE_AGE); let stale = DhtCoreEngine::collect_stale_peers_in_bucket( &routing, @@ -2668,7 +2858,7 @@ mod tests { let node = NodeInfo { id: PeerId::from_bytes(id), addresses: vec![bt_addr], - last_seen: Instant::now(), + last_seen: AtomicInstant::now(), address_types: vec![], }; @@ -2693,7 +2883,7 @@ mod tests { let n = NodeInfo { id: PeerId::from_bytes(node_id), addresses: vec![bt], - last_seen: Instant::now(), + last_seen: AtomicInstant::now(), address_types: vec![], }; let r = dht.add_node_no_trust(n).await; @@ -2890,7 +3080,7 @@ mod tests { .iter_mut() .find(|n| n.id == PeerId::from_bytes(id_far)) .unwrap(); - node.last_seen = Instant::now() - TEST_STALE_AGE; + node.last_seen.store(Instant::now() - TEST_STALE_AGE); } let far_peer = PeerId::from_bytes(id_far); diff --git a/src/dht/security_tests.rs b/src/dht/security_tests.rs index 479beec..f0877d6 100644 --- a/src/dht/security_tests.rs +++ b/src/dht/security_tests.rs @@ -1,14 +1,13 @@ use crate::PeerId; -use crate::dht::core_engine::{DhtCoreEngine, NodeInfo}; +use crate::dht::core_engine::{AtomicInstant, DhtCoreEngine, NodeInfo}; use crate::security::IPDiversityConfig; -use std::time::Instant; /// Helper: create a NodeInfo with a specific PeerId (from byte array) and address. fn make_node_with_id(id_bytes: [u8; 32], addr: &str) -> NodeInfo { NodeInfo { id: PeerId::from_bytes(id_bytes), addresses: vec![addr.parse().unwrap()], - last_seen: Instant::now(), + last_seen: AtomicInstant::now(), address_types: vec![], } } @@ -186,7 +185,6 @@ async fn test_ipv4_ip_override_raises_limit() -> anyhow::Result<()> { engine.set_ip_diversity_config(IPDiversityConfig { max_per_ip: Some(3), max_per_subnet: Some(usize::MAX), - ..IPDiversityConfig::default() }); for i in 1..=3u8 { @@ -433,7 +431,7 @@ async fn test_self_insertion_rejected() -> anyhow::Result<()> { let self_node = NodeInfo { id: self_id, addresses: vec!["/ip4/10.0.0.1/udp/9000/quic".parse().unwrap()], - last_seen: Instant::now(), + last_seen: AtomicInstant::now(), address_types: vec![], }; let result = engine.add_node_no_trust(self_node).await; diff --git a/src/dht_network_manager.rs b/src/dht_network_manager.rs index 025ed4c..4973d19 100644 --- a/src/dht_network_manager.rs +++ b/src/dht_network_manager.rs @@ -23,7 +23,7 @@ use crate::{ adaptive::TrustEngine, adaptive::trust::DEFAULT_NEUTRAL_TRUST, address::MultiAddr, - dht::core_engine::NodeInfo, + dht::core_engine::{AtomicInstant, NodeInfo}, dht::{AdmissionResult, DhtCoreEngine, DhtKey, Key, RoutingTableEvent}, error::{DhtError, IdentityError, NetworkError}, network::NodeConfig, @@ -1590,7 +1590,8 @@ impl DhtNetworkManager { let pid_bytes = *peer_id.to_bytes(); info!( "dial_candidate: setting hole_punch_target_peer_id for {} = {}", - socket_addr, hex::encode(&pid_bytes[..8]) + socket_addr, + hex::encode(&pid_bytes[..8]) ); self.transport .set_hole_punch_target_peer_id(socket_addr, pid_bytes) @@ -2111,7 +2112,7 @@ impl DhtNetworkManager { id: node_id, addresses, address_types, - last_seen: Instant::now(), + last_seen: AtomicInstant::now(), }; let trust_fn = |peer_id: &PeerId| -> f64 { diff --git a/src/transport_handle.rs b/src/transport_handle.rs index 5ee99a7..a8da4ed 100644 --- a/src/transport_handle.rs +++ b/src/transport_handle.rs @@ -31,9 +31,12 @@ use crate::network::{ use crate::transport::saorsa_transport_adapter::{ConnectionEvent, DualStackNetworkNode}; use crate::validation::{RateLimitConfig, RateLimiter}; +use std::collections::hash_map::DefaultHasher; use std::collections::{HashMap, HashSet}; +use std::hash::{Hash, Hasher}; use std::net::SocketAddr; use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Duration; use tokio::sync::{RwLock, broadcast}; use tokio::task::JoinHandle; @@ -580,7 +583,9 @@ impl TransportHandle { /// Set the target peer ID for a hole-punch attempt to a specific address. /// See [`P2pEndpoint::set_hole_punch_target_peer_id`]. pub async fn set_hole_punch_target_peer_id(&self, target: SocketAddr, peer_id: [u8; 32]) { - self.dual_node.set_hole_punch_target_peer_id(target, peer_id).await; + self.dual_node + .set_hole_punch_target_peer_id(target, peer_id) + .await; } /// Set a preferred coordinator for hole-punching to a specific target. @@ -1374,149 +1379,344 @@ impl TransportHandle { Ok(()) } - /// Spawns per-stack recv tasks and a dispatcher that routes incoming messages. + /// Spawns per-stack recv tasks and a **sharded** dispatcher that routes + /// incoming messages across [`MESSAGE_DISPATCH_SHARDS`] parallel consumer + /// tasks. + /// + /// # Why sharded? + /// + /// The previous implementation used a single consumer task to drain + /// every inbound message in the entire node. At 60 peers this kept up + /// comfortably, but at 1000 peers it became the dominant serialisation + /// point: each message pass through this loop took three async write + /// locks (`peer_to_channel`, `channel_to_peers`, `peer_user_agents`) + /// and an awaited `register_connection_peer_id` call before the next + /// message could even be looked at. Responses arrived late, past the + /// 25 s caller timeout, producing the `[STEP 6 FAILED]` and + /// `[STEP 5a FAILED] Response channel closed (receiver timed out)` + /// cascades observed in the 1000-node testnet logs. + /// + /// Sharding by hash of the source IP gives each shard its own consumer + /// running in parallel, so lock contention is now distributed across N + /// simultaneous writers instead of serialised behind a single task. + /// Messages from the **same peer** always route to the **same shard** + /// (ordering is preserved per peer). The dispatcher task is light + /// (hash + channel send) so it is never the bottleneck. async fn start_message_receiving_system(&self) -> Result<()> { - info!("Starting message receiving system"); + info!( + "Starting message receiving system ({} dispatch shards)", + MESSAGE_DISPATCH_SHARDS + ); - let (tx, mut rx) = tokio::sync::mpsc::channel(MESSAGE_RECV_CHANNEL_CAPACITY); + let (upstream_tx, mut upstream_rx) = + tokio::sync::mpsc::channel(MESSAGE_RECV_CHANNEL_CAPACITY); let mut handles = self .dual_node - .spawn_recv_tasks(tx.clone(), self.shutdown.clone()); - drop(tx); - - let event_tx = self.event_tx.clone(); - let active_requests = Arc::clone(&self.active_requests); - let peer_to_channel = Arc::clone(&self.peer_to_channel); - let channel_to_peers = Arc::clone(&self.channel_to_peers); - let peer_user_agents = Arc::clone(&self.peer_user_agents); - let self_peer_id = *self.node_identity.peer_id(); - let dual_node_for_peer_reg = Arc::clone(&self.dual_node); + .spawn_recv_tasks(upstream_tx.clone(), self.shutdown.clone()); + drop(upstream_tx); + + // Per-shard capacity so the aggregate buffered depth matches the old + // single-channel capacity, keeping memory usage comparable. Floor + // at `MIN_SHARD_CHANNEL_CAPACITY` so each shard retains enough + // slack for small bursts even if the global capacity is tiny. + let per_shard_capacity = (MESSAGE_RECV_CHANNEL_CAPACITY / MESSAGE_DISPATCH_SHARDS) + .max(MIN_SHARD_CHANNEL_CAPACITY); + + let mut shard_txs: Vec)>> = + Vec::with_capacity(MESSAGE_DISPATCH_SHARDS); + + for shard_idx in 0..MESSAGE_DISPATCH_SHARDS { + let (shard_tx, shard_rx) = tokio::sync::mpsc::channel(per_shard_capacity); + shard_txs.push(shard_tx); + + let event_tx = self.event_tx.clone(); + let active_requests = Arc::clone(&self.active_requests); + let peer_to_channel = Arc::clone(&self.peer_to_channel); + let channel_to_peers = Arc::clone(&self.channel_to_peers); + let peer_user_agents = Arc::clone(&self.peer_user_agents); + let self_peer_id = *self.node_identity.peer_id(); + let dual_node_for_peer_reg = Arc::clone(&self.dual_node); + + handles.push(tokio::spawn(async move { + Self::run_shard_consumer( + shard_idx, + shard_rx, + event_tx, + active_requests, + peer_to_channel, + channel_to_peers, + peer_user_agents, + self_peer_id, + dual_node_for_peer_reg, + ) + .await; + })); + } + + // Dispatcher: single task whose only job is to hash `from_addr` and + // hand the message off to the appropriate shard. The actual heavy + // lifting happens in parallel in the shard consumers. + // + // Failure isolation: a single shard's `try_send` failure must NOT + // collapse the dispatcher. If a shard channel is full we log and + // drop the message (incrementing a counter). If a shard task has + // panicked and its receiver is closed we log and drop, but keep + // routing to the other healthy shards. The dispatcher only exits + // when its upstream channel closes (i.e. transport shutdown). + let drop_counter = Arc::new(AtomicU64::new(0)); handles.push(tokio::spawn(async move { - info!("Message receive loop started"); - while let Some((from_addr, bytes)) = rx.recv().await { - let channel_id = from_addr.to_string(); - trace!("Received {} bytes from channel {}", bytes.len(), channel_id); - - match parse_protocol_message(&bytes, &channel_id) { - Some(ParsedMessage { - event, - authenticated_node_id, - user_agent: peer_user_agent, - }) => { - // If the message was signed, record the app↔channel mapping. - // A peer may be reachable over multiple channels simultaneously - // (e.g. QUIC + Bluetooth), so we add to the set — never replace. - // Skip our own identity to avoid self-registration via echoed messages. - if let Some(ref app_id) = authenticated_node_id - && *app_id != self_peer_id - { - let mut p2c = peer_to_channel.write().await; - let is_new_peer = !p2c.contains_key(app_id); - let channels = p2c.entry(*app_id).or_default(); - let inserted = channels.insert(channel_id.clone()); - if inserted { - channel_to_peers - .write() - .await - .entry(channel_id.clone()) - .or_default() - .insert(*app_id); - } - // Drop the lock before emitting events. - drop(p2c); - - // Register peer ID at the low-level transport - // endpoint so PUNCH_ME_NOW relay can find this - // peer by identity instead of socket address. - dual_node_for_peer_reg - .register_connection_peer_id(from_addr, *app_id.to_bytes()) - .await; - - if is_new_peer { - peer_user_agents - .write() - .await - .insert(*app_id, peer_user_agent.clone()); - broadcast_event( - &event_tx, - P2PEvent::PeerConnected(*app_id, peer_user_agent.clone()), - ); - } + info!( + "Message dispatcher loop started (sharded across {} consumers)", + MESSAGE_DISPATCH_SHARDS + ); + while let Some((from_addr, bytes)) = upstream_rx.recv().await { + let shard_idx = shard_index_for_addr(&from_addr); + match shard_txs[shard_idx].try_send((from_addr, bytes)) { + Ok(()) => {} + Err(tokio::sync::mpsc::error::TrySendError::Full(_dropped)) => { + // Backpressure: this shard is overloaded. Drop the + // message rather than blocking the dispatcher and + // starving the other shards. Per-shard ordering for + // this peer is broken for the dropped message but + // preserved for everything that does land. + let prev = drop_counter.fetch_add(1, Ordering::Relaxed); + if prev.is_multiple_of(SHARD_DROP_LOG_INTERVAL) { + warn!( + shard = shard_idx, + from = %from_addr, + total_drops = prev + 1, + "Dispatcher dropped inbound message: shard channel full" + ); } + } + Err(tokio::sync::mpsc::error::TrySendError::Closed(_dropped)) => { + // Shard consumer task has exited (likely panic). + // Drop this message but keep routing to the other + // shards — fault isolation, not cascade failure. + let prev = drop_counter.fetch_add(1, Ordering::Relaxed); + if prev.is_multiple_of(SHARD_DROP_LOG_INTERVAL) { + warn!( + shard = shard_idx, + from = %from_addr, + total_drops = prev + 1, + "Dispatcher dropped inbound message: shard consumer closed" + ); + } + } + } + } + info!("Message dispatcher loop ended — upstream channel closed"); + })); - // Identity announces are internal plumbing — don't - // emit as app-level messages. - if let P2PEvent::Message { ref topic, .. } = event - && topic == IDENTITY_ANNOUNCE_PROTOCOL - { - continue; + *self.recv_handles.write().await = handles; + Ok(()) + } + + /// Consumer loop for a single dispatch shard. + /// + /// Each shard runs one of these in its own `tokio::spawn` task. Shard + /// assignment is by hash of the source IP, so messages from the same + /// peer always go through the same shard (ordering is preserved per + /// peer). Shared state (`peer_to_channel`, `active_requests`, etc.) is + /// still behind global `RwLock`s but the lock hold times are now + /// spread across [`MESSAGE_DISPATCH_SHARDS`] concurrent consumers + /// instead of being fully serialised. + #[allow(clippy::too_many_arguments)] + async fn run_shard_consumer( + shard_idx: usize, + mut shard_rx: tokio::sync::mpsc::Receiver<(SocketAddr, Vec)>, + event_tx: broadcast::Sender, + active_requests: Arc>>, + peer_to_channel: Arc>>>, + channel_to_peers: Arc>>>, + peer_user_agents: Arc>>, + self_peer_id: PeerId, + dual_node_for_peer_reg: Arc, + ) { + info!("Message dispatch shard {shard_idx} started"); + while let Some((from_addr, bytes)) = shard_rx.recv().await { + let channel_id = from_addr.to_string(); + trace!( + shard = shard_idx, + "Received {} bytes from channel {}", + bytes.len(), + channel_id + ); + + match parse_protocol_message(&bytes, &channel_id) { + Some(ParsedMessage { + event, + authenticated_node_id, + user_agent: peer_user_agent, + }) => { + // If the message was signed, record the app↔channel mapping. + // A peer may be reachable over multiple channels simultaneously + // (e.g. QUIC + Bluetooth), so we add to the set — never replace. + // Skip our own identity to avoid self-registration via echoed messages. + if let Some(ref app_id) = authenticated_node_id + && *app_id != self_peer_id + { + // Hold `peer_to_channel` across the transport-level + // `register_connection_peer_id` call so the app-level + // map and the transport's internal addr→peer map are + // consistent for any concurrent reader. Without this, + // there is a microsecond window in which a hole-punch + // lookup that depends on the registration can fail + // spuriously even though the app-level map already + // says the peer is known. + let mut p2c = peer_to_channel.write().await; + let is_new_peer = !p2c.contains_key(app_id); + let channels = p2c.entry(*app_id).or_default(); + let inserted = channels.insert(channel_id.clone()); + if inserted { + channel_to_peers + .write() + .await + .entry(channel_id.clone()) + .or_default() + .insert(*app_id); } - if let P2PEvent::Message { - ref topic, - ref data, - .. - } = event - && topic.starts_with("/rr/") - && let Ok(envelope) = - postcard::from_bytes::(data) - && envelope.is_response - { - let mut reqs = active_requests.write().await; - let expected_peer = match reqs.get(&envelope.message_id) { - Some(pending) => pending.expected_peer, - None => { - trace!( - message_id = %envelope.message_id, - "Unmatched /rr/ response (likely timed out) — suppressing" - ); - continue; - } - }; - // Accept response only if the authenticated app-level - // identity matches. Channel IDs identify connections, - // not peers, so they are not checked here. - if authenticated_node_id.as_ref() != Some(&expected_peer) { - warn!( + // Register peer ID at the low-level transport + // endpoint so PUNCH_ME_NOW relay can find this + // peer by identity instead of socket address. + dual_node_for_peer_reg + .register_connection_peer_id(from_addr, *app_id.to_bytes()) + .await; + + // Drop the lock before emitting events so subscribers + // that re-enter the registry don't deadlock. + drop(p2c); + + if is_new_peer { + peer_user_agents + .write() + .await + .insert(*app_id, peer_user_agent.clone()); + broadcast_event( + &event_tx, + P2PEvent::PeerConnected(*app_id, peer_user_agent.clone()), + ); + } + } + + // Identity announces are internal plumbing — don't + // emit as app-level messages. + if let P2PEvent::Message { ref topic, .. } = event + && topic == IDENTITY_ANNOUNCE_PROTOCOL + { + continue; + } + + if let P2PEvent::Message { + ref topic, + ref data, + .. + } = event + && topic.starts_with("/rr/") + && let Ok(envelope) = postcard::from_bytes::(data) + && envelope.is_response + { + let mut reqs = active_requests.write().await; + let expected_peer = match reqs.get(&envelope.message_id) { + Some(pending) => pending.expected_peer, + None => { + trace!( message_id = %envelope.message_id, - expected = %expected_peer, - actual_channel = %channel_id, - authenticated = ?authenticated_node_id, - "Response origin mismatch — ignoring" + "Unmatched /rr/ response (likely timed out) — suppressing" ); continue; } - if let Some(pending) = reqs.remove(&envelope.message_id) { - if pending.response_tx.send(envelope.payload).is_err() { - warn!( - message_id = %envelope.message_id, - "Response receiver dropped before delivery" - ); - } - continue; - } - trace!( + }; + // Accept response only if the authenticated app-level + // identity matches. Channel IDs identify connections, + // not peers, so they are not checked here. + if authenticated_node_id.as_ref() != Some(&expected_peer) { + warn!( message_id = %envelope.message_id, - "Unmatched /rr/ response (likely timed out) — suppressing" + expected = %expected_peer, + actual_channel = %channel_id, + authenticated = ?authenticated_node_id, + "Response origin mismatch — ignoring" ); continue; } - broadcast_event(&event_tx, event); - } - None => { - warn!("Failed to parse protocol message ({} bytes)", bytes.len()); + if let Some(pending) = reqs.remove(&envelope.message_id) { + if pending.response_tx.send(envelope.payload).is_err() { + warn!( + message_id = %envelope.message_id, + "Response receiver dropped before delivery" + ); + } + continue; + } + trace!( + message_id = %envelope.message_id, + "Unmatched /rr/ response (likely timed out) — suppressing" + ); + continue; } + broadcast_event(&event_tx, event); + } + None => { + warn!( + shard = shard_idx, + "Failed to parse protocol message ({} bytes)", + bytes.len() + ); } } - info!("Message receive loop ended — channel closed"); - })); - - *self.recv_handles.write().await = handles; - Ok(()) + } + info!("Message dispatch shard {shard_idx} ended — channel closed"); } } +/// Number of parallel dispatch shards for inbound messages. +/// +/// Messages are routed to a shard by hash of the source IP so each peer's +/// messages are processed by the same consumer (preserving per-peer +/// ordering) while different peers' messages run in parallel. Picked to +/// match typical core counts on deployment hardware — tuning higher helps +/// only if the shared state `RwLock`s are no longer the dominant +/// contention, which is not the case today. +const MESSAGE_DISPATCH_SHARDS: usize = 8; + +/// Minimum mpsc capacity for an individual dispatch shard channel. +/// +/// The per-shard capacity is normally `MESSAGE_RECV_CHANNEL_CAPACITY / +/// MESSAGE_DISPATCH_SHARDS`, but when that division rounds to something +/// too small for healthy bursts we floor it at this value so each shard +/// retains a reasonable amount of buffering headroom. +const MIN_SHARD_CHANNEL_CAPACITY: usize = 16; + +/// Log a warning every Nth dropped message in the dispatcher. +/// +/// `try_send` failures (channel full, or shard task closed) increment a +/// global drop counter; logging at every drop would flood the log under +/// sustained backpressure, so we coalesce to one warning per +/// `SHARD_DROP_LOG_INTERVAL` drops. The first drop in a burst is always +/// logged so the operator sees the onset. +const SHARD_DROP_LOG_INTERVAL: u64 = 64; + +/// Pick the dispatch shard for an inbound message. +/// +/// Hashes by `IpAddr` (not full `SocketAddr`) so a peer re-connecting from +/// a new ephemeral port still lands in the same shard. +/// +/// **Ordering caveat:** ordering is preserved per *source IP*, not per +/// authenticated peer. If a peer's public IP changes (NAT rebinding to a +/// new external address, mobile Wi-Fi↔cellular roaming, dual-stack +/// failover) it now hashes to a different shard, and messages from the +/// old IP that are still queued in the old shard may be processed +/// concurrently with new messages from the new IP. Application-layer +/// causality across an IP change is *not* guaranteed by this dispatcher. +fn shard_index_for_addr(addr: &SocketAddr) -> usize { + let mut hasher = DefaultHasher::new(); + addr.ip().hash(&mut hasher); + (hasher.finish() as usize) % MESSAGE_DISPATCH_SHARDS +} + // ============================================================================ // Shutdown // ============================================================================