diff --git a/src/transport_handle.rs b/src/transport_handle.rs index 020bea2..7c43b9f 100644 --- a/src/transport_handle.rs +++ b/src/transport_handle.rs @@ -1971,51 +1971,30 @@ impl TransportHandle { ConnectionEvent::Established { remote_address, .. } => { - let channel_id = remote_address.to_string(); - debug!( - "Connection established: channel={}, addr={}", - channel_id, remote_address - ); - - active_connections.write().await.insert(channel_id.clone()); - - let mut peers_lock = peers.write().await; - if let Some(peer_info) = peers_lock.get_mut(&channel_id) { - peer_info.status = ConnectionStatus::Connected; - peer_info.connected_at = Instant::now(); - } else { - debug!("Registering new incoming channel: {}", channel_id); - peers_lock.insert( - channel_id.clone(), - PeerInfo { - channel_id: channel_id.clone(), - addresses: vec![MultiAddr::quic(remote_address)], - status: ConnectionStatus::Connected, - last_seen: Instant::now(), - connected_at: Instant::now(), - protocols: Vec::new(), - heartbeat_count: 0, - }, - ); - } - - // Send identity announce so the remote peer can authenticate us. - match Self::create_identity_announce_bytes(&node_identity, &user_agent) { - Ok(announce_bytes) => { - if let Err(e) = dual_node - .send_to_peer_optimized(&remote_address, &announce_bytes) - .await - { - warn!("Failed to send identity announce to {channel_id}: {e}"); + let dual_node_clone = Arc::clone(&dual_node); + let node_identity_ref = &node_identity; + let user_agent_ref = user_agent.as_str(); + Self::handle_connection_established( + &active_connections, + &peers, + remote_address, + || match Self::create_identity_announce_bytes( + node_identity_ref, + user_agent_ref, + ) { + Ok(bytes) => Some(bytes), + Err(e) => { + warn!("Failed to create identity announce: {e}"); + None } - } - Err(e) => { - warn!("Failed to create identity announce: {e}"); - } - } - - // PeerConnected is emitted when the remote receives and - // verifies our identity announce — not at transport level. + }, + move |addr, bytes| async move { + dual_node_clone + .send_to_peer_optimized(&addr, &bytes) + .await + }, + ) + .await; } ConnectionEvent::Lost { remote_address, reason } | ConnectionEvent::Failed { remote_address, reason } => { @@ -2053,6 +2032,91 @@ impl TransportHandle { } } } + + /// Handle a `ConnectionEvent::Established` event: register the channel in + /// `active_connections` and `peers`, synthesise the identity-announce + /// payload via `make_announce_bytes`, then invoke `send_announce` to ship + /// it to the remote peer. + /// + /// Extracted from `connection_lifecycle_monitor_with_rx` so that tests can + /// inject controllable closures and observe whether the `peers` write + /// guard is released before the send future runs. + /// + /// Operation order is preserved from the original inline code: + /// 1. `active_connections.write().insert(...)` + /// 2. `peers.write()` + update-or-insert (guard scoped, released here) + /// 3. `make_announce_bytes()` (synchronous — after map updates so any + /// future side effects on `node_identity` follow registration) + /// 4. `send_announce(...).await` (no locks held) + async fn handle_connection_established( + active_connections: &Arc>>, + peers: &Arc>>, + remote_address: SocketAddr, + make_announce_bytes: M, + send_announce: F, + ) where + M: FnOnce() -> Option> + Send, + F: FnOnce(SocketAddr, Vec) -> Fut + Send, + Fut: std::future::Future> + Send, + { + let channel_id = remote_address.to_string(); + debug!("Connection established: channel={channel_id}, addr={remote_address}"); + + active_connections.write().await.insert(channel_id.clone()); + + // Scope the `peers` write guard tightly around the synchronous map + // mutation so it is released before the identity-announce send + // `.await` below. Holding this guard across an `.await` on a QUIC + // send starves every concurrent reader of `peers` for the full + // duration of the network round-trip — including the 8 message + // dispatch shard consumers, DHT queries, and the accept loop. + { + let mut peers_lock = peers.write().await; + match peers_lock.get_mut(&channel_id) { + Some(peer_info) => { + peer_info.status = ConnectionStatus::Connected; + peer_info.connected_at = Instant::now(); + } + None => { + debug!("Registering new incoming channel: {channel_id}"); + peers_lock.insert( + channel_id.clone(), + PeerInfo { + channel_id: channel_id.clone(), + addresses: vec![MultiAddr::quic(remote_address)], + status: ConnectionStatus::Connected, + last_seen: Instant::now(), + connected_at: Instant::now(), + protocols: Vec::new(), + heartbeat_count: 0, + }, + ); + } + } + } + + // Synthesise announce bytes AFTER map registration so any future + // side effects (e.g. nonce bump on `node_identity`) follow the + // channel being visible in the maps. Preserves original ordering. + // + // INVARIANT: this call MUST run after the `peers` write guard has + // been dropped (the block above). Do not extend the guard scope + // past this point — `reader_not_blocked_by_slow_identity_announce` + // only catches guards held across the SEND await, not across this + // synchronous call. + let announce_bytes = make_announce_bytes(); + + // Send identity announce so the remote peer can authenticate us. + // No `peers` guard is held here — readers can proceed concurrently. + if let Some(bytes) = announce_bytes + && let Err(e) = send_announce(remote_address, bytes).await + { + warn!("Failed to send identity announce to {channel_id}: {e}"); + } + + // PeerConnected is emitted when the remote receives and + // verifies our identity announce — not at transport level. + } } // ============================================================================ @@ -2117,3 +2181,428 @@ impl TransportHandle { .insert(peer_id); } } + +// ============================================================================ +// Tests for `handle_connection_established` — verifies the `peers` write guard +// is released before the identity-announce send future runs, so concurrent +// readers are not blocked by a slow remote. +// ============================================================================ +#[cfg(test)] +mod connection_established_tests { + use super::*; + use std::sync::Arc; + use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; + use std::time::Duration; + use tokio::sync::Notify; + use tokio::time::timeout; + + type SharedActive = Arc>>; + type SharedPeers = Arc>>; + + /// Wall-clock timing budget for the regression test. + /// + /// `SLOW_SEND` is the length of the fake identity-announce send; a buggy + /// implementation (guard held across the send) blocks readers for the + /// full duration. `READ_BUDGET` is the absolute wall-clock ceiling for a + /// concurrent read to complete. On fixed code the read completes in + /// microseconds; the budget exists purely to absorb scheduler jitter + /// on loaded CI runners (GitHub Actions macOS runners have been + /// observed with 300-500ms scheduler stalls under load). + /// + /// The margin (SLOW_SEND / READ_BUDGET = 3x) leaves 800ms of headroom + /// before the test could false-positive on a stalled runner. + const SLOW_SEND: Duration = Duration::from_millis(1200); + const READ_BUDGET: Duration = Duration::from_millis(400); + + fn test_addr() -> SocketAddr { + "127.0.0.1:65535" + .parse() + .expect("static literal is a valid SocketAddr") + } + + fn numbered_addr(port: u16) -> SocketAddr { + let raw = format!("127.0.0.1:{port}"); + raw.parse().expect("numbered literal is a valid SocketAddr") + } + + fn empty_maps() -> (SharedActive, SharedPeers) { + ( + Arc::new(RwLock::new(HashSet::new())), + Arc::new(RwLock::new(HashMap::new())), + ) + } + + /// Regression test: while the identity-announce send is in flight, a + /// concurrent reader of the `peers` map must not be blocked. + /// + /// Before the fix (`peers_lock` held across `send_announce.await`) the + /// read times out, because the slow send holds the write guard for the + /// full `SLOW_SEND` duration. + /// + /// After the fix (`peers_lock` scoped to the synchronous map mutation) + /// the read returns well within `READ_BUDGET`. + /// + /// The test is intentionally wall-clock bound and cannot be accelerated + /// by `tokio::time::pause()` — the whole point is to observe real-time + /// reader starvation, not virtual-time behaviour. Do not introduce + /// `pause()` here. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn reader_not_blocked_by_slow_identity_announce() { + let (active, peers) = empty_maps(); + let send_started = Arc::new(Notify::new()); + + let active_clone = Arc::clone(&active); + let peers_clone = Arc::clone(&peers); + let send_started_clone = Arc::clone(&send_started); + + let helper_task = tokio::spawn(async move { + TransportHandle::handle_connection_established( + &active_clone, + &peers_clone, + test_addr(), + || Some(b"dummy-announce".to_vec()), + move |_addr, _bytes| async move { + // Signal the reader that the map update is complete and + // the send phase has begun. `Notify::notify_one` stores + // a permit if no waiter is registered yet, so even if + // the main task reaches `notified().await` after this + // line runs, the wake-up is not lost. + send_started_clone.notify_one(); + tokio::time::sleep(SLOW_SEND).await; + Ok(()) + }, + ) + .await; + }); + + // Wait until the send phase is in progress, then try to read `peers`. + send_started.notified().await; + + let read_result = timeout(READ_BUDGET, async { + let guard = peers.read().await; + guard.len() + }) + .await; + + // Join the helper BEFORE asserting the read result so we never leak + // the task. If the read timed out (bug present) the helper still has + // the remainder of SLOW_SEND to run; we pay that cost to fail cleanly. + helper_task.await.expect("helper task panicked"); + + match read_result { + Ok(len) => assert_eq!( + len, 1, + "expected exactly one peer entry after handle_connection_established" + ), + Err(_) => panic!( + "read of peers timed out after {READ_BUDGET:?} while identity announce send was in flight — \ + the peers write guard is held across the send (regression or unfixed)" + ), + } + } + + /// Contract test — success path: the helper must leave both maps + /// populated with the channel id in `Connected` status. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn contract_success_populates_maps() { + let (active, peers) = empty_maps(); + + TransportHandle::handle_connection_established( + &active, + &peers, + test_addr(), + || Some(b"announce".to_vec()), + |_addr, _bytes| async { Ok(()) }, + ) + .await; + + let channel_id = test_addr().to_string(); + assert!( + active.read().await.contains(&channel_id), + "active_connections should contain the channel id after Established" + ); + let peers_guard = peers.read().await; + let peer_info = peers_guard + .get(&channel_id) + .expect("peers map should contain the channel id after Established"); + assert_eq!(peer_info.status, ConnectionStatus::Connected); + assert_eq!(peer_info.channel_id, channel_id); + assert_eq!(peer_info.addresses.len(), 1); + } + + /// Contract test — send-error path: a failing send must NOT unwind the + /// map updates. Both maps remain populated, matching the behaviour of the + /// original inline code which only logged the error via `warn!`. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn contract_send_error_preserves_map_state() { + let (active, peers) = empty_maps(); + + TransportHandle::handle_connection_established( + &active, + &peers, + test_addr(), + || Some(b"announce".to_vec()), + |_addr, _bytes| async { Err(anyhow::anyhow!("simulated send failure")) }, + ) + .await; + + let channel_id = test_addr().to_string(); + assert!( + active.read().await.contains(&channel_id), + "active_connections must stay populated even when the identity announce send fails" + ); + assert!( + peers.read().await.contains_key(&channel_id), + "peers must stay populated even when the identity announce send fails" + ); + } + + /// Contract test — reconnect path: a second `Established` event for the + /// same channel id must update the existing `PeerInfo` in place (not + /// duplicate it), flipping `connected_at` forward without removing and + /// re-inserting the entry. + /// + /// This test captures `connected_at` after the first call and asserts it + /// strictly advances after the second call, which proves the helper took + /// the `Some(peer_info)` branch rather than the `None` re-insert branch. + /// No internal state is mutated from the test body. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn contract_reconnect_updates_in_place() { + let (active, peers) = empty_maps(); + + // First connect. + TransportHandle::handle_connection_established( + &active, + &peers, + test_addr(), + || Some(b"announce".to_vec()), + |_addr, _bytes| async { Ok(()) }, + ) + .await; + + let connected_at_1 = peers + .read() + .await + .get(&test_addr().to_string()) + .expect("first connect should have inserted the entry") + .connected_at; + + // Guarantee the tokio Instant clock advances between the two calls + // so `connected_at_2 > connected_at_1` is observable. 2ms is well + // above Instant's resolution on every platform we target. + tokio::time::sleep(Duration::from_millis(2)).await; + + // Second connect for the same channel id. + TransportHandle::handle_connection_established( + &active, + &peers, + test_addr(), + || Some(b"announce".to_vec()), + |_addr, _bytes| async { Ok(()) }, + ) + .await; + + let peers_guard = peers.read().await; + assert_eq!( + peers_guard.len(), + 1, + "reconnect must update in place, not duplicate" + ); + let peer_info = peers_guard + .get(&test_addr().to_string()) + .expect("entry should still exist after reconnect"); + assert_eq!( + peer_info.status, + ConnectionStatus::Connected, + "reconnect must leave status == Connected" + ); + assert!( + peer_info.connected_at > connected_at_1, + "reconnect must advance connected_at (Some branch), got {:?} vs {:?}", + peer_info.connected_at, + connected_at_1, + ); + } + + /// Contract test — `None` announce bytes: when `make_announce_bytes` + /// returns `None` (simulating `create_identity_announce_bytes` failing + /// upstream), the helper must still populate the maps but skip the + /// send. Matches the `warn!` + no-send behaviour of the original inline + /// code. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn contract_missing_announce_bytes_still_populates_maps() { + let (active, peers) = empty_maps(); + let send_called = Arc::new(AtomicBool::new(false)); + let send_called_clone = Arc::clone(&send_called); + + TransportHandle::handle_connection_established( + &active, + &peers, + test_addr(), + || None, + move |_addr, _bytes| { + let flag = Arc::clone(&send_called_clone); + async move { + flag.store(true, Ordering::SeqCst); + Ok(()) + } + }, + ) + .await; + + let channel_id = test_addr().to_string(); + assert!(active.read().await.contains(&channel_id)); + assert!(peers.read().await.contains_key(&channel_id)); + assert!( + !send_called.load(Ordering::SeqCst), + "send closure must not be invoked when make_announce_bytes returns None" + ); + } + + /// Concurrent-caller correctness test: spawn N parallel + /// `handle_connection_established` calls on different channel IDs and + /// assert every one lands in the maps exactly once. Ports used are + /// string keys only — no socket is bound. + /// + /// This test catches HashMap corruption under disjoint-key concurrency + /// and acts as a compile-time latch against regressing the `Send` + /// bounds on `F` / `Fut` / `M` (spawning `handle_connection_established` + /// forces the compiler to verify the future is `Send`). + /// + /// It does NOT by itself catch "guard held across send await" — N + /// tasks would simply serialise on the write lock and still land in + /// the map. `concurrent_sends_run_in_parallel_not_serialised` below + /// is the test that proves concurrent send execution, and so catches + /// the held-guard bug under concurrency. + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn concurrent_establish_different_channels_all_land() { + let (active, peers) = empty_maps(); + const N: usize = 10; + let send_count = Arc::new(AtomicUsize::new(0)); + + let mut handles = Vec::with_capacity(N); + for i in 0..N { + let active_clone = Arc::clone(&active); + let peers_clone = Arc::clone(&peers); + let send_count_clone = Arc::clone(&send_count); + let addr = numbered_addr(10_000 + i as u16); + + handles.push(tokio::spawn(async move { + TransportHandle::handle_connection_established( + &active_clone, + &peers_clone, + addr, + || Some(b"announce".to_vec()), + move |_addr, _bytes| { + let counter = Arc::clone(&send_count_clone); + async move { + counter.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + }, + ) + .await; + })); + } + + // Collect every result before asserting — we want to surface + // every panicking handle, not only the first one. Any join + // error is promoted to a test failure. + let mut join_errors: Vec = Vec::new(); + for (i, h) in handles.into_iter().enumerate() { + if let Err(e) = h.await { + join_errors.push(format!("handle {i}: {e}")); + } + } + assert!( + join_errors.is_empty(), + "one or more helper tasks panicked: {join_errors:?}" + ); + + assert_eq!( + active.read().await.len(), + N, + "every concurrent caller must leave an entry in active_connections" + ); + assert_eq!( + peers.read().await.len(), + N, + "every concurrent caller must leave an entry in peers" + ); + assert_eq!( + send_count.load(Ordering::SeqCst), + N, + "every concurrent caller must have invoked the send closure exactly once" + ); + // Spot-check one entry to confirm it is Connected, not a partially + // constructed placeholder. + let peers_guard = peers.read().await; + let sample = peers_guard + .get(&numbered_addr(10_000).to_string()) + .expect("first spawned entry should be present"); + assert_eq!(sample.status, ConnectionStatus::Connected); + } + + /// Concurrent-send parallelism test: spawn N parallel + /// `handle_connection_established` calls, each with a send closure + /// that parks for `SLOW_SEND_PER_TASK`, and assert the total wall-clock + /// time is strictly less than `N * SLOW_SEND_PER_TASK / 2`. If the + /// `peers` write guard is held across the send await, the N tasks + /// serialise on the write lock and total time approaches + /// `N * SLOW_SEND_PER_TASK`. If the guard is released (the fix), the + /// sends run in parallel and total time is close to a single + /// `SLOW_SEND_PER_TASK`. + /// + /// This is the concurrency-domain counterpart of + /// `reader_not_blocked_by_slow_identity_announce` and closes the + /// coverage gap called out by reviewer A. + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn concurrent_sends_run_in_parallel_not_serialised() { + const N: usize = 8; + const SLOW_SEND_PER_TASK: Duration = Duration::from_millis(400); + // Serialised (buggy) lower bound: N * SLOW_SEND_PER_TASK = 3200ms. + // Parallel (fixed) expected: ~SLOW_SEND_PER_TASK = 400ms plus + // scheduler overhead. We assert strictly less than half the + // serialised time to leave generous CI headroom. + let serialised_half = (SLOW_SEND_PER_TASK * N as u32) / 2; + + let (active, peers) = empty_maps(); + let mut handles = Vec::with_capacity(N); + let wall_start = std::time::Instant::now(); + + for i in 0..N { + let active_clone = Arc::clone(&active); + let peers_clone = Arc::clone(&peers); + let addr = numbered_addr(20_000 + i as u16); + + handles.push(tokio::spawn(async move { + TransportHandle::handle_connection_established( + &active_clone, + &peers_clone, + addr, + || Some(b"announce".to_vec()), + move |_addr, _bytes| async move { + tokio::time::sleep(SLOW_SEND_PER_TASK).await; + Ok(()) + }, + ) + .await; + })); + } + + for h in handles { + h.await.expect("helper task panicked"); + } + let elapsed = wall_start.elapsed(); + + assert!( + elapsed < serialised_half, + "N={N} concurrent sends took {elapsed:?}, expected strictly less than \ + {serialised_half:?} (half of the fully serialised time). \ + Sends appear to be serialising on the peers write lock — \ + the guard may be held across the send await." + ); + assert_eq!(peers.read().await.len(), N); + assert_eq!(active.read().await.len(), N); + } +}