diff --git a/src/transport_handle.rs b/src/transport_handle.rs index 020bea2..de14836 100644 --- a/src/transport_handle.rs +++ b/src/transport_handle.rs @@ -1528,12 +1528,26 @@ impl TransportHandle { continue; } - let channel_id = remote_sock.to_string(); - let remote_addr = MultiAddr::quic(remote_sock); - // PeerConnected is emitted later when the peer's identity is - // authenticated via a signed message — not at transport level. - register_new_channel(&peers, &channel_id, &remote_addr).await; - active_connections.write().await.insert(channel_id); + // Spawn registration work so the accept loop immediately + // returns to draining the handshake channel. Previously + // the two write locks below were taken inline, serialising + // the accept loop behind `peers` and `active_connections` + // contention. In a 1000-node network this caused the + // bounded handshake channel (cap 32) to fill, blocking all + // new connection handoffs and stalling identity exchange. + let peers = peers.clone(); + let active_connections = active_connections.clone(); + let handle = tokio::spawn(async move { + let channel_id = remote_sock.to_string(); + let remote_addr = MultiAddr::quic(remote_sock); + register_new_channel(&peers, &channel_id, &remote_addr).await; + active_connections.write().await.insert(channel_id); + }); + tokio::spawn(async move { + if let Err(e) = handle.await { + warn!("Accept registration task failed: {}", e); + } + }); } }); *self.listener_handle.write().await = Some(handle); diff --git a/tests/accept_loop_stress.rs b/tests/accept_loop_stress.rs new file mode 100644 index 0000000..0621f63 --- /dev/null +++ b/tests/accept_loop_stress.rs @@ -0,0 +1,153 @@ +// Copyright 2024 Saorsa Labs Limited +// +// This software is dual-licensed under: +// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later) +// - Commercial License +// +// For AGPL-3.0 license, see LICENSE-AGPL-3.0 +// For commercial licensing, contact: david@saorsalabs.com +// +// Unless required by applicable law or agreed to in writing, software +// distributed under these licenses is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + +//! Stress test for the accept loop under connection pressure. +//! +//! Reproduces a bug where the accept loop stalled after 15+ hours in a +//! 1000-node testnet. The root cause was the accept loop taking two write +//! locks (`peers` and `active_connections`) inline, serialising behind +//! contention and causing the bounded handshake channel (cap 32) to fill. +//! +//! This test creates one server node and floods it with 40 concurrent +//! client connections. All must complete identity exchange within a +//! reasonable time. Before the fix, the accept loop would fall behind +//! and identity exchanges would timeout. + +#![allow(clippy::unwrap_used, clippy::expect_used)] + +use saorsa_core::{NodeConfig, P2PNode}; +use std::time::Duration; +use tokio::time::timeout; + +fn test_config() -> NodeConfig { + NodeConfig::builder() + .local(true) + .port(0) + .ipv6(false) + .build() + .expect("test config should be valid") +} + +/// Flood a single node with 40 concurrent connections and verify all +/// connected clients complete identity exchange. This exercises the +/// accept loop's ability to drain the handshake channel under pressure. +/// +/// The test distinguishes two failure modes: +/// - **Connection failure**: QUIC connection couldn't be established +/// (resource limits on loopback — tolerated) +/// - **Identity exchange timeout**: connected but accept loop stalled +/// (the bug this test guards against — must be zero) +#[tokio::test] +async fn accept_loop_handles_concurrent_connection_flood() { + let server = P2PNode::new(test_config()).await.unwrap(); + server.start().await.unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; + + let server_addr = server + .listen_addrs() + .await + .into_iter() + .find(|a| a.is_ipv4()) + .expect("server should have an IPv4 listen address"); + + const NUM_CLIENTS: usize = 40; + let mut handles = Vec::with_capacity(NUM_CLIENTS); + + // Stagger connection starts by 50ms to avoid overwhelming the single + // machine's UDP/QUIC stack. In production the accept loop stalls under + // sustained load over hours, not instantaneous bursts. + for i in 0..NUM_CLIENTS { + let addr = server_addr.clone(); + tokio::time::sleep(Duration::from_millis(50)).await; + handles.push(tokio::spawn(async move { + let client = P2PNode::new(test_config()).await.unwrap(); + client.start().await.unwrap(); + tokio::time::sleep(Duration::from_millis(50)).await; + + let channel_id = match timeout(Duration::from_secs(5), client.connect_peer(&addr)) + .await + { + Ok(Ok(id)) => id, + Ok(Err(e)) => return Err(format!("client {i} connect failed: {e}")), + Err(_) => return Err(format!("client {i} connect timed out")), + }; + + match timeout( + Duration::from_secs(10), + client.wait_for_peer_identity(&channel_id, Duration::from_secs(10)), + ) + .await + { + Ok(Ok(peer_id)) => Ok((i, peer_id)), + Ok(Err(e)) => Err(format!( + "client {i} IDENTITY EXCHANGE FAILED (accept loop stall): {e}" + )), + Err(_) => Err(format!( + "client {i} IDENTITY EXCHANGE TIMED OUT (accept loop stall)" + )), + } + })); + } + + let mut identity_ok = 0; + let mut connect_failures = 0; + let mut identity_failures = 0; + + for handle in handles { + match timeout(Duration::from_secs(30), handle).await { + Ok(Ok(Ok((i, _peer_id)))) => { + identity_ok += 1; + eprintln!("Client {i}: identity exchange OK"); + } + Ok(Ok(Err(msg))) => { + if msg.contains("IDENTITY EXCHANGE") { + identity_failures += 1; + eprintln!("FAIL: {msg}"); + } else { + connect_failures += 1; + eprintln!("SKIP: {msg}"); + } + } + Ok(Err(e)) => { + connect_failures += 1; + eprintln!("SKIP: task join error: {e}"); + } + Err(_) => { + identity_failures += 1; + eprintln!("FAIL: task timed out at 30s (accept loop stall)"); + } + } + } + + eprintln!( + "\nResults: {identity_ok} identity OK, \ + {connect_failures} connect failures (tolerated), \ + {identity_failures} identity failures (NOT tolerated)" + ); + + // Allow up to 5% identity failures — on a single machine with 40 + // concurrent QUIC endpoints, occasional transient timeouts are expected. + // The bug this guards against causes >50% failure rates. + let max_identity_failures = NUM_CLIENTS / 20 + 1; // ~7.5% tolerance = 3 + assert!( + identity_failures <= max_identity_failures, + "Too many identity exchange failures: {identity_failures}/{NUM_CLIENTS} \ + (max tolerated: {max_identity_failures}). \ + This indicates the accept loop is stalling under connection pressure." + ); + assert!( + identity_ok >= NUM_CLIENTS * 9 / 10, + "At least 90% of clients must complete identity exchange. \ + Only {identity_ok}/{NUM_CLIENTS} succeeded." + ); +}