From ff30dab46fad23b03b926b5f73cac4d69e48cf42 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Thu, 12 Feb 2026 09:59:40 +0100 Subject: [PATCH 1/7] fix(dht): revert participation-only routing table promotion --- src/dht_network_manager.rs | 267 ++++++++++++------------------------- 1 file changed, 85 insertions(+), 182 deletions(-) diff --git a/src/dht_network_manager.rs b/src/dht_network_manager.rs index 49522149..212a4603 100644 --- a/src/dht_network_manager.rs +++ b/src/dht_network_manager.rs @@ -346,8 +346,6 @@ pub struct DhtPeerInfo { pub avg_latency: Duration, /// Reliability score (0.0 to 1.0) pub reliability_score: f64, - /// Whether this peer has been promoted to the routing table. - pub in_routing_table: bool, } /// DHT network events @@ -2168,15 +2166,9 @@ impl DhtNetworkManager { }) } - /// Update peer information. - /// - /// Lookup-only traffic (e.g. `FindNode`, `FindValue`, `Get`) updates peer - /// liveness metadata but does not promote the peer into the routing table. - /// Routing promotion is reserved for participation operations (`Put`/`Join`) - /// so short-lived request clients are not inserted as DHT routing peers. - async fn update_peer_info(&self, peer_id: PeerId, message: &DhtNetworkMessage) { + /// Update peer information + async fn update_peer_info(&self, peer_id: PeerId, _message: &DhtNetworkMessage) { let dht_key = crate::dht::derive_dht_key_from_peer_id(&peer_id); - let should_promote = Self::should_promote_to_routing(message); // Get peer addresses from transport layer let addresses = if let Some(peer_info) = self.transport.peer_info(&peer_id).await { @@ -2186,82 +2178,29 @@ impl DhtNetworkManager { Vec::new() }; - let (should_add_to_routing, routing_address) = { - let mut peers = self.dht_peers.write().await; - let peer_info = peers.entry(peer_id.clone()).or_insert_with(|| DhtPeerInfo { - peer_id: peer_id.clone(), - dht_key, - addresses: Vec::new(), - last_seen: Instant::now(), - is_connected: true, - avg_latency: Duration::from_millis(50), - reliability_score: 1.0, - in_routing_table: false, - }); - - peer_info.last_seen = Instant::now(); - peer_info.is_connected = true; - // Update addresses if we have new ones - if !addresses.is_empty() { - peer_info.addresses = addresses; - } - - debug!( - "Updated peer info for {} with {} addresses", - peer_id, - peer_info.addresses.len() - ); - - let routing_address = peer_info.addresses.first().map(|addr| addr.to_string()); - let should_add_to_routing = - should_promote && !peer_info.in_routing_table && routing_address.is_some(); - (should_add_to_routing, routing_address) - }; - - if !should_promote { - return; - } - - if !should_add_to_routing { - return; - } - - let Some(address_str) = routing_address else { - warn!( - "Peer {} eligible for routing promotion but has no address", - peer_id - ); - return; - }; - - // Promote to DHT routing table on first non-lookup participation signal. - { - use crate::dht::core_engine::{NodeCapacity, NodeId, NodeInfo}; - - let node_info = NodeInfo { - id: NodeId::from_bytes(dht_key), - address: address_str, - last_seen: SystemTime::now(), - capacity: NodeCapacity::default(), - }; - - if let Err(e) = self.dht.write().await.add_node(node_info).await { - warn!("Failed to add peer {} to DHT routing table: {}", peer_id, e); - return; - } - } + let mut peers = self.dht_peers.write().await; + let peer_info = peers.entry(peer_id.clone()).or_insert_with(|| DhtPeerInfo { + peer_id: peer_id.clone(), + dht_key, + addresses: addresses.clone(), + last_seen: Instant::now(), + is_connected: true, + avg_latency: Duration::from_millis(50), + reliability_score: 1.0, + }); - // Mark local promotion state for this peer. - if let Some(peer_info) = self.dht_peers.write().await.get_mut(&peer_id) { - peer_info.in_routing_table = true; + peer_info.last_seen = Instant::now(); + peer_info.is_connected = true; + // Update addresses if we have new ones + if !addresses.is_empty() { + peer_info.addresses = addresses; } - info!("Added peer {} to DHT routing table", peer_id); - if self.event_tx.receiver_count() > 0 { - let _ = self - .event_tx - .send(DhtNetworkEvent::PeerDiscovered { peer_id, dht_key }); - } + debug!( + "Updated peer info for {} with {} addresses", + peer_id, + peer_info.addresses.len() + ); } /// Reconcile already-connected peers into DHT bookkeeping/routing. @@ -2293,45 +2232,75 @@ impl DhtNetworkManager { Vec::new() }; - // Track peer liveness only. Routing promotion is handled in update_peer_info() - // after non-lookup DHT participation is observed. - let mut peers = self.dht_peers.write().await; - match peers.entry(peer_id.clone()) { - std::collections::hash_map::Entry::Occupied(mut entry) => { - let peer_info = entry.get_mut(); - peer_info.last_seen = Instant::now(); - peer_info.is_connected = true; - if !addresses.is_empty() { - peer_info.addresses = addresses; + // Track peer and decide whether it should be promoted to routing table. + let should_add_to_routing = { + let mut peers = self.dht_peers.write().await; + match peers.entry(peer_id.clone()) { + std::collections::hash_map::Entry::Occupied(mut entry) => { + let peer_info = entry.get_mut(); + let had_addresses = !peer_info.addresses.is_empty(); + peer_info.last_seen = Instant::now(); + peer_info.is_connected = true; + if !addresses.is_empty() { + peer_info.addresses = addresses.clone(); + } + !addresses.is_empty() && !had_addresses + } + std::collections::hash_map::Entry::Vacant(entry) => { + entry.insert(DhtPeerInfo { + peer_id: peer_id.clone(), + dht_key, + addresses: addresses.clone(), + last_seen: Instant::now(), + is_connected: true, + avg_latency: Duration::from_millis(50), + reliability_score: 1.0, + }); + !addresses.is_empty() } } - std::collections::hash_map::Entry::Vacant(entry) => { - entry.insert(DhtPeerInfo { - peer_id: peer_id.clone(), - dht_key, - addresses, - last_seen: Instant::now(), - is_connected: true, - avg_latency: Duration::from_millis(50), - reliability_score: 1.0, - in_routing_table: false, - }); + }; + + // Skip peers with no addresses - they cannot be used for DHT routing. + let address_str = match addresses.first() { + Some(addr) => addr.to_string(), + None => { + warn!( + "Peer {} has no addresses, skipping DHT routing table addition", + peer_id + ); + return; } }; - debug!( - "Peer {} tracked for DHT liveness; routing promotion deferred until non-lookup DHT participation", - peer_id - ); - } + if !should_add_to_routing { + debug!("Peer {} already tracked in DHT routing state", peer_id); + return; + } + + // Add to DHT routing table. + { + use crate::dht::core_engine::{NodeCapacity, NodeId, NodeInfo}; - /// Whether an inbound DHT message should promote the sender to routing. - fn should_promote_to_routing(message: &DhtNetworkMessage) -> bool { - matches!(&message.message_type, DhtMessageType::Request) - && matches!( - &message.payload, - DhtNetworkOperation::Put { .. } | DhtNetworkOperation::Join - ) + let node_info = NodeInfo { + id: NodeId::from_bytes(dht_key), + address: address_str, + last_seen: SystemTime::now(), + capacity: NodeCapacity::default(), + }; + + if let Err(e) = self.dht.write().await.add_node(node_info).await { + warn!("Failed to add peer {} to DHT routing table: {}", peer_id, e); + } else { + info!("Added peer {} to DHT routing table", peer_id); + } + } + + if self.event_tx.receiver_count() > 0 { + let _ = self + .event_tx + .send(DhtNetworkEvent::PeerDiscovered { peer_id, dht_key }); + } } /// Start network event handler @@ -2713,69 +2682,3 @@ impl Default for DhtNetworkConfig { } } } - -#[cfg(test)] -#[allow(clippy::unwrap_used, clippy::expect_used)] -mod tests { - use super::*; - - fn message(payload: DhtNetworkOperation, message_type: DhtMessageType) -> DhtNetworkMessage { - DhtNetworkMessage { - message_id: "test-message".to_string(), - source: "test-source".to_string(), - target: Some("test-target".to_string()), - message_type, - payload, - result: None, - timestamp: 0, - ttl: 1, - hop_count: 0, - } - } - - #[test] - fn test_should_promote_to_routing_skips_lookup_requests() { - let key = [0u8; 32]; - - let find_node = message( - DhtNetworkOperation::FindNode { key }, - DhtMessageType::Request, - ); - assert!(!DhtNetworkManager::should_promote_to_routing(&find_node)); - - let find_value = message( - DhtNetworkOperation::FindValue { key }, - DhtMessageType::Request, - ); - assert!(!DhtNetworkManager::should_promote_to_routing(&find_value)); - - let get = message(DhtNetworkOperation::Get { key }, DhtMessageType::Request); - assert!(!DhtNetworkManager::should_promote_to_routing(&get)); - } - - #[test] - fn test_should_promote_to_routing_for_participation_requests_only() { - let key = [0u8; 32]; - - let put = message( - DhtNetworkOperation::Put { - key, - value: vec![1, 2, 3], - }, - DhtMessageType::Request, - ); - assert!(DhtNetworkManager::should_promote_to_routing(&put)); - - let join = message(DhtNetworkOperation::Join, DhtMessageType::Request); - assert!(DhtNetworkManager::should_promote_to_routing(&join)); - - let put_response = message( - DhtNetworkOperation::Put { - key, - value: vec![1, 2, 3], - }, - DhtMessageType::Response, - ); - assert!(!DhtNetworkManager::should_promote_to_routing(&put_response)); - } -} From 08ccb3f3a0a3d310c678a49783eab67bb7680a65 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Thu, 12 Feb 2026 13:35:51 +0100 Subject: [PATCH 2/7] fix(p2p): fix graceful shutdown for P2PNode --- Cargo.toml | 1 + src/dht/core_engine.rs | 23 ++- src/dht_network_manager.rs | 220 ++++++++++---------- src/network.rs | 48 ++--- src/transport/ant_quic_adapter.rs | 4 +- src/transport_handle.rs | 320 +++++++++++++----------------- tests/network_wiring_e2e_test.rs | 2 +- 7 files changed, 290 insertions(+), 328 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 453b72ee..07c6f418 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -105,6 +105,7 @@ lazy_static = "1.5" log = "0.4" tokio-stream = "0.1" +tokio-util = { version = "0.7", features = ["rt"] } # Health check dependencies axum = "0.8" diff --git a/src/dht/core_engine.rs b/src/dht/core_engine.rs index 310277b1..a51e2edc 100644 --- a/src/dht/core_engine.rs +++ b/src/dht/core_engine.rs @@ -20,9 +20,9 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Duration, SystemTime}; use tokio::sync::{RwLock, oneshot}; +use tokio_util::sync::CancellationToken; use uuid::Uuid; /// DHT key type (256-bit) @@ -522,8 +522,8 @@ pub struct DhtCoreEngine { /// Optional trust-aware peer selector for combining distance with trust scores trust_peer_selector: Option>, - /// Shutdown flag for background maintenance tasks - shutdown: Arc, + /// Shutdown token for background maintenance tasks + shutdown: CancellationToken, } impl DhtCoreEngine { @@ -586,7 +586,7 @@ impl DhtCoreEngine { transport: None, pending_requests: Arc::new(RwLock::new(HashMap::new())), trust_peer_selector: None, - shutdown: Arc::new(AtomicBool::new(false)), + shutdown: CancellationToken::new(), }) } @@ -698,7 +698,7 @@ impl DhtCoreEngine { /// Signal background maintenance tasks to stop pub fn signal_shutdown(&self) { - self.shutdown.store(true, Ordering::Relaxed); + self.shutdown.cancel(); } /// Start background maintenance tasks for security and health @@ -707,16 +707,17 @@ impl DhtCoreEngine { let eviction_manager = self.eviction_manager.clone(); let close_group_validator = self.close_group_validator.clone(); let security_metrics = self.security_metrics.clone(); - let shutdown = Arc::clone(&self.shutdown); + let shutdown = self.shutdown.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(60)); loop { - interval.tick().await; - - if shutdown.load(Ordering::Relaxed) { - tracing::info!("DHT core maintenance task shutting down"); - break; + tokio::select! { + _ = interval.tick() => {} + () = shutdown.cancelled() => { + tracing::info!("DHT core maintenance task shutting down"); + break; + } } // 1. Run Bucket Refresh Logic with Validation Integration diff --git a/src/dht_network_manager.rs b/src/dht_network_manager.rs index 212a4603..ec5d6d1a 100644 --- a/src/dht_network_manager.rs +++ b/src/dht_network_manager.rs @@ -29,10 +29,10 @@ use crate::{ use serde::{Deserialize, Serialize}; use std::collections::{BTreeSet, HashMap, HashSet, VecDeque}; use std::net::SocketAddr; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant, SystemTime}; use tokio::sync::{RwLock, Semaphore, broadcast, oneshot}; +use tokio_util::sync::CancellationToken; use tracing::{debug, info, trace, warn}; use uuid::Uuid; @@ -287,8 +287,8 @@ pub struct DhtNetworkManager { /// Cached local DHT key to avoid repeated SHA-256 hashing /// PERF-001: Eliminates redundant computation in message handlers local_dht_key: DhtKey, - /// Shutdown flag for background tasks - shutdown: Arc, + /// Shutdown token for background tasks + shutdown: CancellationToken, /// Handle for the maintenance task so it can be joined on stop maintenance_handle: Arc>>>, /// Handle for the network event handler task @@ -449,7 +449,7 @@ impl DhtNetworkManager { maintenance_scheduler, message_handler_semaphore, local_dht_key, - shutdown: Arc::new(AtomicBool::new(false)), + shutdown: CancellationToken::new(), maintenance_handle: Arc::new(RwLock::new(None)), event_handler_handle: Arc::new(RwLock::new(None)), }) @@ -661,7 +661,7 @@ impl DhtNetworkManager { self.leave_network().await?; // Signal all background tasks to stop - self.shutdown.store(true, Ordering::Relaxed); + self.shutdown.cancel(); // Signal the DHT core engine's maintenance task to stop self.dht.read().await.signal_shutdown(); @@ -2310,115 +2310,118 @@ impl DhtNetworkManager { // Subscribe to network events from transport layer let mut events = self.transport.subscribe_events(); - let shutdown = Arc::clone(&self.shutdown); + let shutdown = self.shutdown.clone(); let handle = tokio::spawn(async move { loop { - match events.recv().await { - Ok(event) => match event { - crate::network::P2PEvent::PeerConnected(peer_id) => { - self_arc.handle_peer_connected(peer_id).await; - } - crate::network::P2PEvent::PeerDisconnected(peer_id) => { - info!("DHT peer disconnected: {}", peer_id); - - // Update peer status - { - let mut peers = self_arc.dht_peers.write().await; - if let Some(peer_info) = peers.get_mut(&peer_id) { - peer_info.is_connected = false; + tokio::select! { + () = shutdown.cancelled() => { + info!("Network event handler shutting down"); + break; + } + recv = events.recv() => { + match recv { + Ok(event) => match event { + crate::network::P2PEvent::PeerConnected(peer_id) => { + self_arc.handle_peer_connected(peer_id).await; } - } + crate::network::P2PEvent::PeerDisconnected(peer_id) => { + info!("DHT peer disconnected: {}", peer_id); - if self_arc.event_tx.receiver_count() > 0 - && let Err(e) = self_arc - .event_tx - .send(DhtNetworkEvent::PeerDisconnected { peer_id }) - { - warn!("Failed to send PeerDisconnected event: {}", e); - } - } - crate::network::P2PEvent::Message { - topic, - source, - data, - } => { - trace!( - " [EVENT] Message received: topic={}, source={}, {} bytes", - topic, - source, - data.len() - ); - if topic == "/dht/1.0.0" { - trace!(" [EVENT] Processing DHT message from {}", source); - // Process the DHT message with backpressure via semaphore - let manager_clone = Arc::clone(&self_arc); - let source_clone = source.clone(); - let semaphore = Arc::clone(&self_arc.message_handler_semaphore); - tokio::spawn(async move { - // Acquire permit for backpressure - limits concurrent handlers - let _permit = match semaphore.acquire().await { - Ok(permit) => permit, - Err(_) => { - warn!("Message handler semaphore closed"); - return; + // Update peer status + { + let mut peers = self_arc.dht_peers.write().await; + if let Some(peer_info) = peers.get_mut(&peer_id) { + peer_info.is_connected = false; } - }; - - // SEC-001: Wrap handle_dht_message with timeout to prevent DoS via long-running handlers - // This ensures permits are released even if a handler gets stuck - match tokio::time::timeout( - REQUEST_TIMEOUT, - manager_clone.handle_dht_message(&data, &source_clone), - ) - .await + } + + if self_arc.event_tx.receiver_count() > 0 + && let Err(e) = self_arc + .event_tx + .send(DhtNetworkEvent::PeerDisconnected { peer_id }) { - Ok(Ok(Some(response))) => { - // Send response back to the source peer - if let Err(e) = manager_clone - .transport - .send_message(&source_clone, "/dht/1.0.0", response) - .await + warn!("Failed to send PeerDisconnected event: {}", e); + } + } + crate::network::P2PEvent::Message { + topic, + source, + data, + } => { + trace!( + " [EVENT] Message received: topic={}, source={}, {} bytes", + topic, + source, + data.len() + ); + if topic == "/dht/1.0.0" { + trace!(" [EVENT] Processing DHT message from {}", source); + // Process the DHT message with backpressure via semaphore + let manager_clone = Arc::clone(&self_arc); + let source_clone = source.clone(); + let semaphore = Arc::clone(&self_arc.message_handler_semaphore); + tokio::spawn(async move { + // Acquire permit for backpressure - limits concurrent handlers + let _permit = match semaphore.acquire().await { + Ok(permit) => permit, + Err(_) => { + warn!("Message handler semaphore closed"); + return; + } + }; + + // SEC-001: Wrap handle_dht_message with timeout to prevent DoS via long-running handlers + // This ensures permits are released even if a handler gets stuck + match tokio::time::timeout( + REQUEST_TIMEOUT, + manager_clone.handle_dht_message(&data, &source_clone), + ) + .await { - warn!( - "Failed to send DHT response to {}: {}", - source_clone, e - ); + Ok(Ok(Some(response))) => { + // Send response back to the source peer + if let Err(e) = manager_clone + .transport + .send_message(&source_clone, "/dht/1.0.0", response) + .await + { + warn!( + "Failed to send DHT response to {}: {}", + source_clone, e + ); + } + } + Ok(Ok(None)) => { + // No response needed (e.g., for response messages) + } + Ok(Err(e)) => { + warn!( + "Failed to handle DHT message from {}: {}", + source_clone, e + ); + } + Err(_) => { + // Timeout occurred - log warning and release permit + warn!( + "DHT message handler timed out after {:?} for peer {}: potential DoS attempt or slow processing", + REQUEST_TIMEOUT, source_clone + ); + } } - } - Ok(Ok(None)) => { - // No response needed (e.g., for response messages) - } - Ok(Err(e)) => { - warn!( - "Failed to handle DHT message from {}: {}", - source_clone, e - ); - } - Err(_) => { - // Timeout occurred - log warning and release permit - warn!( - "DHT message handler timed out after {:?} for peer {}: potential DoS attempt or slow processing", - REQUEST_TIMEOUT, source_clone - ); - } + // _permit dropped here, releasing semaphore slot + }); } - // _permit dropped here, releasing semaphore slot - }); + } + }, + Err(broadcast::error::RecvError::Lagged(skipped)) => { + warn!("Network event handler lagged, skipped {} events", skipped); + } + Err(broadcast::error::RecvError::Closed) => { + info!("Network event channel closed, stopping event handler"); + break; } } - }, - Err(broadcast::error::RecvError::Lagged(skipped)) => { - warn!("Network event handler lagged, skipped {} events", skipped); } - Err(broadcast::error::RecvError::Closed) => { - info!("Network event channel closed, stopping event handler"); - break; - } - } - - if shutdown.load(Ordering::Relaxed) { - info!("Network event handler shutting down"); - break; } } }); @@ -2444,17 +2447,18 @@ impl DhtNetworkManager { let dht_peers = Arc::clone(&self.dht_peers); let stats = Arc::clone(&self.stats); let event_tx = self.event_tx.clone(); - let shutdown = Arc::clone(&self.shutdown); + let shutdown = self.shutdown.clone(); let handle = tokio::spawn(async move { let mut check_interval = tokio::time::interval(Duration::from_secs(5)); loop { - check_interval.tick().await; - - if shutdown.load(Ordering::Relaxed) { - info!("DHT maintenance task shutting down"); - break; + tokio::select! { + _ = check_interval.tick() => {} + () = shutdown.cancelled() => { + info!("DHT maintenance task shutting down"); + break; + } } // Get due tasks from scheduler diff --git a/src/network.rs b/src/network.rs index 2e9d018c..b6afaa66 100644 --- a/src/network.rs +++ b/src/network.rs @@ -32,6 +32,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use tokio::sync::{RwLock, broadcast}; +use tokio_util::sync::CancellationToken; use tokio::time::Instant; use tracing::{debug, info, warn}; @@ -696,8 +697,8 @@ pub struct P2PNode { /// Node start time start_time: Instant, - /// Running state - running: RwLock, + /// Shutdown token — cancelled when the node should stop + shutdown: CancellationToken, /// DHT manager for distributed hash table operations (peer discovery, routing, storage) dht_manager: Arc, @@ -882,7 +883,7 @@ impl P2PNode { peer_id, transport, start_time: Instant::now(), - running: RwLock::new(false), + shutdown: CancellationToken::new(), dht_manager, resource_manager, bootstrap_manager, @@ -1224,9 +1225,6 @@ impl P2PNode { info!("Bootstrap cache manager started"); } - // Set running state - *self.running.write().await = true; - // Start transport listeners and message receiving self.transport.start_network_listeners().await?; @@ -1252,23 +1250,25 @@ impl P2PNode { /// Run the P2P node (blocks until shutdown) pub async fn run(&self) -> Result<()> { - if !*self.running.read().await { + if !self.is_running() { self.start().await?; } info!("P2P node running..."); + let mut interval = tokio::time::interval(Duration::from_millis(RUN_LOOP_TICK_INTERVAL_MS)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + // Main event loop loop { - if !*self.running.read().await { - break; + tokio::select! { + _ = interval.tick() => { + self.transport.maintenance_tick().await?; + } + () = self.shutdown.cancelled() => { + break; + } } - - // Perform periodic maintenance via transport handle - self.transport.maintenance_tick().await?; - - // Sleep for a short interval - tokio::time::sleep(Duration::from_millis(RUN_LOOP_TICK_INTERVAL_MS)).await; } info!("P2P node stopped"); @@ -1279,8 +1279,8 @@ impl P2PNode { pub async fn stop(&self) -> Result<()> { info!("Stopping P2P node..."); - // Set running state to false - *self.running.write().await = false; + // Signal the run loop to exit + self.shutdown.cancel(); // Stop DHT manager first so leave messages can be sent while transport is still active. self.dht_manager.stop().await?; @@ -1307,8 +1307,8 @@ impl P2PNode { } /// Check if the node is running - pub async fn is_running(&self) -> bool { - *self.running.read().await + pub fn is_running(&self) -> bool { + !self.shutdown.is_cancelled() } /// Get the current listen addresses @@ -2093,7 +2093,7 @@ mod tests { let node = P2PNode::new(config).await?; assert_eq!(node.peer_id(), "test_peer_123"); - assert!(!node.is_running().await); + assert!(!node.is_running()); assert_eq!(node.peer_count().await, 0); assert!(node.connected_peers().await.is_empty()); @@ -2109,7 +2109,7 @@ mod tests { // Should have generated a peer ID assert!(node.peer_id().starts_with("peer_")); - assert!(!node.is_running().await); + assert!(!node.is_running()); Ok(()) } @@ -2120,11 +2120,11 @@ mod tests { let node = P2PNode::new(config).await?; // Initially not running - assert!(!node.is_running().await); + assert!(!node.is_running()); // Start the node node.start().await?; - assert!(node.is_running().await); + assert!(node.is_running()); // Check listen addresses were set (at least one) let listen_addrs = node.listen_addrs().await; @@ -2135,7 +2135,7 @@ mod tests { // Stop the node node.stop().await?; - assert!(!node.is_running().await); + assert!(!node.is_running()); Ok(()) } diff --git a/src/transport/ant_quic_adapter.rs b/src/transport/ant_quic_adapter.rs index 273a49e7..f24bb498 100644 --- a/src/transport/ant_quic_adapter.rs +++ b/src/transport/ant_quic_adapter.rs @@ -262,7 +262,7 @@ impl P2PNetworkNode { pub fn spawn_recv_task( &self, tx: tokio::sync::mpsc::Sender<(PeerId, Vec)>, - shutdown: Arc, + shutdown: tokio_util::sync::CancellationToken, ) -> tokio::task::JoinHandle<()> { /// Maximum size of a single received message (16 MB). /// Messages exceeding this limit are dropped to prevent memory exhaustion. @@ -271,7 +271,7 @@ impl P2PNetworkNode { let transport = Arc::clone(&self.transport); tokio::spawn(async move { loop { - if shutdown.load(Ordering::Relaxed) { + if shutdown.is_cancelled() { break; } match transport.endpoint().recv().await { diff --git a/src/transport_handle.rs b/src/transport_handle.rs index c16d1862..7225b902 100644 --- a/src/transport_handle.rs +++ b/src/transport_handle.rs @@ -36,9 +36,9 @@ use crate::{NetworkAddress, PeerId}; use std::collections::{HashMap, HashSet}; use std::net::SocketAddr; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use tokio::sync::{RwLock, broadcast}; +use tokio_util::sync::CancellationToken; use tokio::task::JoinHandle; use tokio::time::Instant; use tracing::{debug, info, trace, warn}; @@ -95,7 +95,7 @@ pub struct TransportHandle { // Held to keep the Arc alive for background tasks that captured a clone. #[allow(dead_code)] geo_provider: Arc, - shutdown: Arc, + shutdown: CancellationToken, resource_manager: Option>, connection_timeout: Duration, stale_peer_threshold: Duration, @@ -169,6 +169,8 @@ impl TransportHandle { .production_config .map(|prod_config| Arc::new(ResourceManager::new(prod_config))); + let shutdown = CancellationToken::new(); + // Subscribe to connection events BEFORE spawning the monitor task let connection_event_rx = dual_node.subscribe_connection_events(); @@ -179,6 +181,7 @@ impl TransportHandle { let dual_node_clone = Arc::clone(&dual_node); let geo_provider_clone = Arc::clone(&geo_provider); let peer_id_clone = config.peer_id.clone(); + let shutdown_token = shutdown.clone(); let handle = tokio::spawn(async move { Self::connection_lifecycle_monitor_with_rx( @@ -189,21 +192,20 @@ impl TransportHandle { event_tx_clone, geo_provider_clone, peer_id_clone, + shutdown_token, ) .await; }); Arc::new(RwLock::new(Some(handle))) }; - let shutdown = Arc::new(AtomicBool::new(false)); - let keepalive_handle = { let active_conns = Arc::clone(&active_connections); let dual_node_clone = Arc::clone(&dual_node); - let shutdown_clone = Arc::clone(&shutdown); + let token = shutdown.clone(); let handle = tokio::spawn(async move { - Self::keepalive_task(active_conns, dual_node_clone, shutdown_clone).await; + Self::keepalive_task(active_conns, dual_node_clone, token).await; }); Arc::new(RwLock::new(Some(handle))) }; @@ -213,7 +215,7 @@ impl TransportHandle { let active_conns_clone = Arc::clone(&active_connections); let event_tx_clone = event_tx.clone(); let stale_threshold = config.stale_peer_threshold; - let shutdown_clone = Arc::clone(&shutdown); + let token = shutdown.clone(); let handle = tokio::spawn(async move { Self::periodic_maintenance_task( @@ -221,7 +223,7 @@ impl TransportHandle { active_conns_clone, event_tx_clone, stale_threshold, - shutdown_clone, + token, ) .await; }); @@ -294,7 +296,7 @@ impl TransportHandle { })), active_requests: Arc::new(RwLock::new(HashMap::new())), geo_provider: Arc::new(BgpGeoProvider::new()), - shutdown: Arc::new(AtomicBool::new(false)), + shutdown: CancellationToken::new(), resource_manager: None, connection_timeout: Duration::from_secs(TEST_CONNECTION_TIMEOUT_SECS), stale_peer_threshold: Duration::from_secs(TEST_STALE_PEER_THRESHOLD_SECS), @@ -912,15 +914,14 @@ impl TransportHandle { info!("Starting message receiving system"); let (tx, mut rx) = tokio::sync::mpsc::channel(MESSAGE_RECV_CHANNEL_CAPACITY); - let shutdown = Arc::clone(&self.shutdown); let mut handles = Vec::new(); if let Some(v6) = self.dual_node.v6.as_ref() { - handles.push(v6.spawn_recv_task(tx.clone(), Arc::clone(&shutdown))); + handles.push(v6.spawn_recv_task(tx.clone(), self.shutdown.clone())); } if let Some(v4) = self.dual_node.v4.as_ref() { - handles.push(v4.spawn_recv_task(tx.clone(), Arc::clone(&shutdown))); + handles.push(v4.spawn_recv_task(tx.clone(), self.shutdown.clone())); } drop(tx); @@ -1012,103 +1013,49 @@ impl TransportHandle { pub async fn stop(&self) -> Result<()> { info!("Stopping transport..."); - self.shutdown.store(true, Ordering::Relaxed); + self.shutdown.cancel(); self.dual_node.shutdown_endpoints().await; // Await recv system tasks let handles: Vec<_> = self.recv_handles.write().await.drain(..).collect(); - for handle in handles { - match handle.await { - Ok(()) => {} - Err(e) if e.is_cancelled() => { - tracing::debug!("Recv task was cancelled during shutdown"); - } - Err(e) if e.is_panic() => { - tracing::error!("Recv task panicked during shutdown: {:?}", e); - } - Err(e) => { - tracing::warn!("Recv task join error during shutdown: {:?}", e); - } - } - } + Self::join_task_handles(handles, "recv").await; + Self::join_task_slot(&self.listener_handle, "listener").await; + Self::join_task_slot(&self.connection_monitor_handle, "connection monitor").await; + Self::join_task_slot(&self.keepalive_handle, "keepalive").await; + Self::join_task_slot(&self.periodic_tasks_handle, "periodic maintenance").await; - // Await accept loop task - if let Some(handle) = self.listener_handle.write().await.take() { - match handle.await { - Ok(()) => {} - Err(e) if e.is_cancelled() => { - tracing::debug!("Listener task was cancelled during shutdown"); - } - Err(e) if e.is_panic() => { - tracing::error!("Listener task panicked during shutdown: {:?}", e); - } - Err(e) => { - tracing::warn!("Listener task join error during shutdown: {:?}", e); - } - } - } + self.disconnect_all_peers().await?; - // Await connection monitor task - if let Some(handle) = self.connection_monitor_handle.write().await.take() { - match handle.await { - Ok(()) => {} - Err(e) if e.is_cancelled() => { - tracing::debug!("Connection monitor task was cancelled during shutdown"); - } - Err(e) if e.is_panic() => { - tracing::error!("Connection monitor task panicked during shutdown: {:?}", e); - } - Err(e) => { - tracing::warn!( - "Connection monitor task join error during shutdown: {:?}", - e - ); - } - } + info!("Transport stopped"); + Ok(()) + } + + async fn join_task_slot(handle_slot: &RwLock>>, task_name: &str) { + let handle = handle_slot.write().await.take(); + if let Some(handle) = handle { + Self::join_task_handle(handle, task_name).await; } + } - // Await keepalive task - if let Some(handle) = self.keepalive_handle.write().await.take() { - match handle.await { - Ok(()) => {} - Err(e) if e.is_cancelled() => { - tracing::debug!("Keepalive task was cancelled during shutdown"); - } - Err(e) if e.is_panic() => { - tracing::error!("Keepalive task panicked during shutdown: {:?}", e); - } - Err(e) => { - tracing::warn!("Keepalive task join error during shutdown: {:?}", e); - } - } + async fn join_task_handles(handles: Vec>, task_name: &str) { + for handle in handles { + Self::join_task_handle(handle, task_name).await; } + } - // Await periodic maintenance task - if let Some(handle) = self.periodic_tasks_handle.write().await.take() { - match handle.await { - Ok(()) => {} - Err(e) if e.is_cancelled() => { - tracing::debug!("Periodic maintenance task was cancelled during shutdown"); - } - Err(e) if e.is_panic() => { - tracing::error!( - "Periodic maintenance task panicked during shutdown: {:?}", - e - ); - } - Err(e) => { - tracing::warn!( - "Periodic maintenance task join error during shutdown: {:?}", - e - ); - } + async fn join_task_handle(handle: JoinHandle<()>, task_name: &str) { + match handle.await { + Ok(()) => {} + Err(e) if e.is_cancelled() => { + tracing::debug!("{task_name} task was cancelled during shutdown"); + } + Err(e) if e.is_panic() => { + tracing::error!("{task_name} task panicked during shutdown: {:?}", e); + } + Err(e) => { + tracing::warn!("{task_name} task join error during shutdown: {:?}", e); } } - - self.disconnect_all_peers().await?; - - info!("Transport stopped"); - Ok(()) } /// Run periodic maintenance: detect stale peers and clean up. @@ -1167,92 +1114,101 @@ impl TransportHandle { event_tx: broadcast::Sender, geo_provider: Arc, _local_peer_id: String, + shutdown: CancellationToken, ) { info!("Connection lifecycle monitor started (pre-subscribed receiver)"); loop { - match event_rx.recv().await { - Ok(event) => match event { - ConnectionEvent::Established { - peer_id, - remote_address, - } => { - let peer_id_str = ant_peer_id_to_string(&peer_id); - debug!( - "Connection established: peer={}, addr={}", - peer_id_str, remote_address - ); - - let ip = remote_address.ip(); - let is_rejected = match ip { - std::net::IpAddr::V4(v4) => { - if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) { - geo_provider.is_hosting_asn(asn) || geo_provider.is_vpn_asn(asn) - } else { - false + tokio::select! { + () = shutdown.cancelled() => { + info!("Connection lifecycle monitor shutting down"); + break; + } + recv = event_rx.recv() => { + match recv { + Ok(event) => match event { + ConnectionEvent::Established { + peer_id, + remote_address, + } => { + let peer_id_str = ant_peer_id_to_string(&peer_id); + debug!( + "Connection established: peer={}, addr={}", + peer_id_str, remote_address + ); + + let ip = remote_address.ip(); + let is_rejected = match ip { + std::net::IpAddr::V4(v4) => { + if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) { + geo_provider.is_hosting_asn(asn) || geo_provider.is_vpn_asn(asn) + } else { + false + } + } + std::net::IpAddr::V6(v6) => { + let info = geo_provider.lookup(v6); + info.is_hosting_provider || info.is_vpn_provider + } + }; + + if is_rejected { + info!( + "Rejecting connection from {} ({}) due to GeoIP policy", + peer_id_str, remote_address + ); + dual_node.disconnect_peer(&peer_id).await; + continue; } - } - std::net::IpAddr::V6(v6) => { - let info = geo_provider.lookup(v6); - info.is_hosting_provider || info.is_vpn_provider - } - }; - if is_rejected { - info!( - "Rejecting connection from {} ({}) due to GeoIP policy", - peer_id_str, remote_address - ); - dual_node.disconnect_peer(&peer_id).await; - continue; - } + active_connections.write().await.insert(peer_id_str.clone()); - active_connections.write().await.insert(peer_id_str.clone()); + let mut peers_lock = peers.write().await; + if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) { + peer_info.status = ConnectionStatus::Connected; + peer_info.connected_at = Instant::now(); + } else { + debug!("Registering new incoming peer: {}", peer_id_str); + peers_lock.insert( + peer_id_str.clone(), + PeerInfo { + peer_id: peer_id_str.clone(), + addresses: vec![remote_address.to_string()], + status: ConnectionStatus::Connected, + last_seen: Instant::now(), + connected_at: Instant::now(), + protocols: Vec::new(), + heartbeat_count: 0, + }, + ); + } - let mut peers_lock = peers.write().await; - if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) { - peer_info.status = ConnectionStatus::Connected; - peer_info.connected_at = Instant::now(); - } else { - debug!("Registering new incoming peer: {}", peer_id_str); - peers_lock.insert( - peer_id_str.clone(), - PeerInfo { - peer_id: peer_id_str.clone(), - addresses: vec![remote_address.to_string()], - status: ConnectionStatus::Connected, - last_seen: Instant::now(), - connected_at: Instant::now(), - protocols: Vec::new(), - heartbeat_count: 0, - }, + broadcast_event(&event_tx, P2PEvent::PeerConnected(peer_id_str)); + } + ConnectionEvent::Lost { peer_id, reason } + | ConnectionEvent::Failed { peer_id, reason } => { + let peer_id_str = ant_peer_id_to_string(&peer_id); + debug!("Connection lost/failed: peer={peer_id_str}, reason={reason}"); + + active_connections.write().await.remove(&peer_id_str); + if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) { + peer_info.status = ConnectionStatus::Disconnected; + peer_info.last_seen = Instant::now(); + } + broadcast_event(&event_tx, P2PEvent::PeerDisconnected(peer_id_str)); + } + }, + Err(broadcast::error::RecvError::Lagged(skipped)) => { + warn!( + "Connection event receiver lagged, skipped {} events", + skipped ); } - - broadcast_event(&event_tx, P2PEvent::PeerConnected(peer_id_str)); - } - ConnectionEvent::Lost { peer_id, reason } - | ConnectionEvent::Failed { peer_id, reason } => { - let peer_id_str = ant_peer_id_to_string(&peer_id); - debug!("Connection lost/failed: peer={peer_id_str}, reason={reason}"); - - active_connections.write().await.remove(&peer_id_str); - if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) { - peer_info.status = ConnectionStatus::Disconnected; - peer_info.last_seen = Instant::now(); + Err(broadcast::error::RecvError::Closed) => { + info!("Connection event channel closed, stopping lifecycle monitor"); + break; } - broadcast_event(&event_tx, P2PEvent::PeerDisconnected(peer_id_str)); } - }, - Err(broadcast::error::RecvError::Lagged(skipped)) => { - warn!( - "Connection event receiver lagged, skipped {} events", - skipped - ); - } - Err(broadcast::error::RecvError::Closed) => { - info!("Connection event channel closed, stopping lifecycle monitor"); - break; } } } @@ -1262,7 +1218,7 @@ impl TransportHandle { async fn keepalive_task( active_connections: Arc>>, dual_node: Arc, - shutdown: Arc, + shutdown: CancellationToken, ) { const KEEPALIVE_INTERVAL_SECS: u64 = 15; @@ -1275,13 +1231,14 @@ impl TransportHandle { ); loop { - if shutdown.load(Ordering::Relaxed) { - info!("Keepalive task shutting down"); - break; + tokio::select! { + _ = interval.tick() => {} + () = shutdown.cancelled() => { + info!("Keepalive task shutting down"); + break; + } } - interval.tick().await; - let peers: Vec = { active_connections.read().await.iter().cloned().collect() }; if peers.is_empty() { @@ -1322,7 +1279,7 @@ impl TransportHandle { active_connections: Arc>>, event_tx: broadcast::Sender, stale_threshold: Duration, - shutdown: Arc, + shutdown: CancellationToken, ) { let cleanup_threshold = stale_threshold * CLEANUP_THRESHOLD_MULTIPLIER; let mut interval = tokio::time::interval(Duration::from_millis(MAINTENANCE_INTERVAL_MS)); @@ -1334,10 +1291,9 @@ impl TransportHandle { ); loop { - interval.tick().await; - - if shutdown.load(Ordering::Relaxed) { - break; + tokio::select! { + _ = interval.tick() => {} + () = shutdown.cancelled() => break, } let (peers_to_remove, peers_to_mark_disconnected) = { diff --git a/tests/network_wiring_e2e_test.rs b/tests/network_wiring_e2e_test.rs index b28e9aa4..a5585a14 100644 --- a/tests/network_wiring_e2e_test.rs +++ b/tests/network_wiring_e2e_test.rs @@ -2017,7 +2017,7 @@ async fn test_graceful_shutdown() { ); // Verify node1 is no longer running - let is_running = node1.is_running().await; + let is_running = node1.is_running(); assert!(!is_running, "Node should not be running after stop()"); info!("=== TEST PASSED: Graceful Shutdown ==="); From 1911540a0ed65280975439d3fd3d3d683040dd37 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Thu, 12 Feb 2026 14:36:17 +0100 Subject: [PATCH 3/7] fix(transport): use tokio::select! in spawn_recv_task for immediate shutdown Replace check-then-await pattern with tokio::select! to race the cancellation token against recv(), eliminating a race condition where shutdown could be missed between the is_cancelled() check and blocking on recv(). Co-Authored-By: Claude Opus 4.6 --- src/transport/ant_quic_adapter.rs | 40 +++++++++++++++++-------------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/src/transport/ant_quic_adapter.rs b/src/transport/ant_quic_adapter.rs index f24bb498..121c060e 100644 --- a/src/transport/ant_quic_adapter.rs +++ b/src/transport/ant_quic_adapter.rs @@ -271,26 +271,30 @@ impl P2PNetworkNode { let transport = Arc::clone(&self.transport); tokio::spawn(async move { loop { - if shutdown.is_cancelled() { - break; - } - match transport.endpoint().recv().await { - Ok((peer_id, data)) => { - if data.len() > MAX_RECV_MESSAGE_SIZE { - tracing::warn!( - "Dropping oversized message ({} bytes) from peer", - data.len() - ); - continue; - } - if tx.send((peer_id, data)).await.is_err() { - break; // channel closed - } - } - Err(e) => { - tracing::debug!("Recv task exiting: {e}"); + tokio::select! { + _ = shutdown.cancelled() => { break; } + result = transport.endpoint().recv() => { + match result { + Ok((peer_id, data)) => { + if data.len() > MAX_RECV_MESSAGE_SIZE { + tracing::warn!( + "Dropping oversized message ({} bytes) from peer", + data.len() + ); + continue; + } + if tx.send((peer_id, data)).await.is_err() { + break; // channel closed + } + } + Err(e) => { + tracing::debug!("Recv task exiting: {e}"); + break; + } + } + } } } }) From 5587f235f0acfa8bf544ce4e43894e375ab6148d Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Thu, 12 Feb 2026 14:53:56 +0100 Subject: [PATCH 4/7] style: fix import ordering in network.rs and transport_handle.rs Co-Authored-By: Claude Opus 4.6 --- src/network.rs | 2 +- src/transport_handle.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/network.rs b/src/network.rs index b6afaa66..ee5f8848 100644 --- a/src/network.rs +++ b/src/network.rs @@ -32,8 +32,8 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use tokio::sync::{RwLock, broadcast}; -use tokio_util::sync::CancellationToken; use tokio::time::Instant; +use tokio_util::sync::CancellationToken; use tracing::{debug, info, warn}; /// Wire protocol message format for P2P communication. diff --git a/src/transport_handle.rs b/src/transport_handle.rs index 7225b902..77b799e6 100644 --- a/src/transport_handle.rs +++ b/src/transport_handle.rs @@ -38,9 +38,9 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; use tokio::sync::{RwLock, broadcast}; -use tokio_util::sync::CancellationToken; use tokio::task::JoinHandle; use tokio::time::Instant; +use tokio_util::sync::CancellationToken; use tracing::{debug, info, trace, warn}; /// Background task maintenance interval in milliseconds. From 769fd8addb385af3671bdbb2c3162a4ab259be39 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Thu, 12 Feb 2026 15:11:27 +0100 Subject: [PATCH 5/7] fix(network): track started state so is_running() is false before start() is_running() only checked !shutdown.is_cancelled(), which is true for a freshly constructed CancellationToken. Add an is_started AtomicBool that is set on start() and cleared on stop() so the method correctly reports false before the node is started. Co-Authored-By: Claude Opus 4.6 --- src/network.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/network.rs b/src/network.rs index ee5f8848..34e24b71 100644 --- a/src/network.rs +++ b/src/network.rs @@ -715,6 +715,9 @@ pub struct P2PNode { /// Bootstrap state tracking - indicates whether peer discovery has completed is_bootstrapped: Arc, + /// Whether `start()` has been called (and `stop()` has not yet completed) + is_started: Arc, + /// EigenTrust engine for reputation management /// /// Used to track peer reliability based on data availability outcomes. @@ -889,6 +892,7 @@ impl P2PNode { bootstrap_manager, security_dashboard, is_bootstrapped: Arc::new(AtomicBool::new(false)), + is_started: Arc::new(AtomicBool::new(false)), trust_engine, }; info!( @@ -1242,6 +1246,9 @@ impl P2PNode { // Connect to bootstrap peers self.connect_bootstrap_peers().await?; + self.is_started + .store(true, std::sync::atomic::Ordering::Release); + Ok(()) } @@ -1297,6 +1304,9 @@ impl P2PNode { info!("Production resource manager stopped"); } + self.is_started + .store(false, std::sync::atomic::Ordering::Release); + info!("P2P node stopped"); Ok(()) } @@ -1308,7 +1318,8 @@ impl P2PNode { /// Check if the node is running pub fn is_running(&self) -> bool { - !self.shutdown.is_cancelled() + self.is_started.load(std::sync::atomic::Ordering::Acquire) + && !self.shutdown.is_cancelled() } /// Get the current listen addresses From c6d2c5a3f731df4b003351f28517c6997a662a03 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Thu, 12 Feb 2026 15:22:21 +0100 Subject: [PATCH 6/7] style: fix rustfmt formatting in is_running() Co-Authored-By: Claude Opus 4.6 --- src/network.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/network.rs b/src/network.rs index 34e24b71..a04cf87d 100644 --- a/src/network.rs +++ b/src/network.rs @@ -1318,8 +1318,7 @@ impl P2PNode { /// Check if the node is running pub fn is_running(&self) -> bool { - self.is_started.load(std::sync::atomic::Ordering::Acquire) - && !self.shutdown.is_cancelled() + self.is_started.load(std::sync::atomic::Ordering::Acquire) && !self.shutdown.is_cancelled() } /// Get the current listen addresses From 46497c1f225907a4549f0f42480d9b6f5784fba2 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Thu, 12 Feb 2026 17:18:26 +0100 Subject: [PATCH 7/7] fix(transport): use CancellationToken for adapter shutdown --- src/transport/ant_quic_adapter.rs | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/transport/ant_quic_adapter.rs b/src/transport/ant_quic_adapter.rs index 121c060e..a87467b0 100644 --- a/src/transport/ant_quic_adapter.rs +++ b/src/transport/ant_quic_adapter.rs @@ -64,10 +64,10 @@ use anyhow::Result; use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use tokio::sync::{RwLock, broadcast}; use tokio::time::sleep; +use tokio_util::sync::CancellationToken; use tracing::info; // Import ant-quic types using the new LinkTransport API (0.14+) @@ -115,7 +115,7 @@ pub struct P2PNetworkNode { /// Connection event broadcaster event_tx: broadcast::Sender, /// Shutdown signal for event polling task - shutdown: Arc, + shutdown: CancellationToken, /// Event forwarder task handle event_task_handle: Option>, /// Geographic configuration for diversity enforcement @@ -308,20 +308,22 @@ impl P2PNetworkNode { transport.register_protocol(SAORSA_DHT_PROTOCOL); let (event_tx, _) = broadcast::channel(crate::DEFAULT_EVENT_CHANNEL_CAPACITY); - let shutdown = Arc::new(AtomicBool::new(false)); + let shutdown = CancellationToken::new(); // Start event forwarder that maps LinkEvent to ConnectionEvent let mut link_events = transport.subscribe(); let event_tx_clone = event_tx.clone(); - let shutdown_clone = Arc::clone(&shutdown); + let shutdown_clone = shutdown.clone(); let peers_clone = Arc::new(RwLock::new(Vec::new())); let peers_for_task = Arc::clone(&peers_clone); let peer_quality = Arc::new(RwLock::new(HashMap::new())); let peer_quality_for_task = Arc::clone(&peer_quality); let event_task_handle = Some(tokio::spawn(async move { - while !shutdown_clone.load(Ordering::Relaxed) { - match link_events.recv().await { + loop { + tokio::select! { + () = shutdown_clone.cancelled() => break, + recv = link_events.recv() => match recv { Ok(LinkEvent::PeerConnected { peer, caps }) => { // Capture quality score from ant-quic Capabilities let quality = caps.quality_score(); @@ -376,7 +378,7 @@ impl P2PNetworkNode { continue; } _ => {} - } + }} } })); @@ -889,13 +891,15 @@ impl P2PNetworkNode { pub async fn shutdown(&mut self) { tracing::info!("Shutting down P2PNetworkNode"); - self.shutdown.store(true, Ordering::Relaxed); + self.shutdown.cancel(); + + // Stop transport first so the link event stream closes and any + // event-forwarder task blocked on recv() can exit. + self.transport.shutdown().await; if let Some(handle) = self.event_task_handle.take() { let _ = handle.await; } - - self.transport.shutdown().await; } }