From 59c477bc94c1626df1787229bcf9fff035c4d84d Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Fri, 30 Jan 2026 14:21:40 +0100 Subject: [PATCH] fix: remove duplicate receive loops by moving networking setup from new() to start() P2PNode::new() and start() both called start_network_listeners() and start_message_receiving_system(), spawning duplicate accept and receive loops competing on the same QUIC endpoint. This caused messages to be randomly split between loops, leading to flaky delivery. Now new() only constructs the struct; start() is the single place where background loops are spawned. All callers updated to call start() after new(). Co-Authored-By: Claude Opus 4.5 --- examples/chat.rs | 1 + examples/test_network.rs | 1 + src/messaging/service.rs | 1 + src/network.rs | 17 ++---- .../connection_lifecycle_integration_test.rs | 6 ++ tests/connection_lifecycle_proof_test.rs | 2 + tests/end_to_end_scenarios_test.rs | 3 +- tests/network_wiring_e2e_test.rs | 60 +++++++++++++++++++ 8 files changed, 77 insertions(+), 14 deletions(-) diff --git a/examples/chat.rs b/examples/chat.rs index cd9551bb..7d6a4e1d 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -64,6 +64,7 @@ async fn main() -> Result<()> { // Create and start the node let node = P2PNode::new(config).await?; + node.start().await?; // Handle bootstrap peers let mut bootstrap_addrs: Vec = Vec::new(); diff --git a/examples/test_network.rs b/examples/test_network.rs index 2a1f1649..1a30db63 100644 --- a/examples/test_network.rs +++ b/examples/test_network.rs @@ -71,6 +71,7 @@ impl TestNode { .await .context("Failed to create P2P node")?, ); + node.start().await.context("Failed to start P2P node")?; // Get actual listen addresses after node creation let actual_addrs = node.listen_addrs().await; diff --git a/src/messaging/service.rs b/src/messaging/service.rs index fc85cb7a..594283be 100644 --- a/src/messaging/service.rs +++ b/src/messaging/service.rs @@ -293,6 +293,7 @@ impl MessagingService { } let node = crate::network::P2PNode::new(node_config).await?; + node.start().await?; Arc::new(node) }; let transport = Arc::new(MessageTransport::new(network, dht_client.clone()).await?); diff --git a/src/network.rs b/src/network.rs index 45419342..88d6fc9c 100644 --- a/src/network.rs +++ b/src/network.rs @@ -1037,17 +1037,7 @@ impl P2PNode { entangled_id: None, binary_hash, }; - info!("Created P2P node with peer ID: {}", node.peer_id); - - // Start the network listeners to populate listen addresses - node.start_network_listeners().await?; - - // Update the connection monitor with actual peers reference - node.start_connection_monitor().await; - - // Start message receiving system so messages work immediately after node creation - // This is critical for basic P2P messaging to work - node.start_message_receiving_system().await?; + info!("Created P2P node with peer ID: {} (call start() to begin networking)", node.peer_id); Ok(node) } @@ -1159,12 +1149,13 @@ impl P2PNode { // Start listening on configured addresses using transport layer self.start_network_listeners().await?; + // Update the connection monitor with actual peers reference + self.start_connection_monitor().await; + // Log current listen addresses let listen_addrs = self.listen_addrs.read().await; info!("P2P node started on addresses: {:?}", *listen_addrs); - // MCP removed - // Start message receiving system self.start_message_receiving_system().await?; diff --git a/tests/connection_lifecycle_integration_test.rs b/tests/connection_lifecycle_integration_test.rs index 3323822c..022e2bd5 100644 --- a/tests/connection_lifecycle_integration_test.rs +++ b/tests/connection_lifecycle_integration_test.rs @@ -66,7 +66,9 @@ async fn test_connection_lifecycle_with_keepalive() { }; let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); // Get their addresses let addrs1 = node1.listen_addrs().await; @@ -182,7 +184,9 @@ async fn test_send_message_validates_connection_state() { }; let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); // Get addresses and connect let addrs2 = node2.listen_addrs().await; @@ -265,7 +269,9 @@ async fn test_multiple_message_exchanges() { }; let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); // Connect nodes let addrs2 = node2.listen_addrs().await; diff --git a/tests/connection_lifecycle_proof_test.rs b/tests/connection_lifecycle_proof_test.rs index 64780115..d212d0cd 100644 --- a/tests/connection_lifecycle_proof_test.rs +++ b/tests/connection_lifecycle_proof_test.rs @@ -37,6 +37,7 @@ async fn test_connection_lifecycle_infrastructure_exists() { }; let node = P2PNode::new(config).await.expect("Failed to create node"); + node.start().await.expect("Failed to start node"); info!("Node created successfully"); @@ -112,6 +113,7 @@ async fn test_keepalive_task_initialized() { }; let _node = P2PNode::new(config).await.expect("Failed to create node"); + _node.start().await.expect("Failed to start node"); // The keepalive task is spawned in P2PNode::new() and runs in the background // It sends keepalive messages every 15 seconds to prevent the 30-second ant-quic timeout diff --git a/tests/end_to_end_scenarios_test.rs b/tests/end_to_end_scenarios_test.rs index 4b62dfdb..228bfa4d 100644 --- a/tests/end_to_end_scenarios_test.rs +++ b/tests/end_to_end_scenarios_test.rs @@ -46,6 +46,7 @@ impl TestUser { let peer_id = format!("test_user_{}", username); let node = P2PNode::new(config).await?; + node.start().await?; Ok(Self { node: Arc::new(node), @@ -55,7 +56,7 @@ impl TestUser { } async fn start(&self) -> Result<()> { - // TODO: Implement when node.start() API is available + // Node is already started after P2PNode::new() + start() in TestUser::new() sleep(Duration::from_millis(100)).await; Ok(()) } diff --git a/tests/network_wiring_e2e_test.rs b/tests/network_wiring_e2e_test.rs index dd7f5128..f5b940de 100644 --- a/tests/network_wiring_e2e_test.rs +++ b/tests/network_wiring_e2e_test.rs @@ -113,7 +113,9 @@ async fn test_two_node_message_exchange() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); // Subscribe to events on node2 BEFORE connecting let mut events2 = node2.subscribe_events(); @@ -217,7 +219,9 @@ async fn test_message_topic_preservation() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events2 = node2.subscribe_events(); @@ -282,7 +286,9 @@ async fn test_bidirectional_message_exchange() { let config2 = create_test_node_config(); let node1 = Arc::new(P2PNode::new(config1).await.expect("Failed to create node1")); + node1.start().await.expect("Failed to start node1"); let node2 = Arc::new(P2PNode::new(config2).await.expect("Failed to create node2")); + node2.start().await.expect("Failed to start node2"); let mut events1 = node1.subscribe_events(); let mut events2 = node2.subscribe_events(); @@ -369,7 +375,9 @@ async fn test_periodic_tasks_updates_last_seen() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); // Connect let addrs2 = node2.listen_addrs().await; @@ -423,7 +431,9 @@ async fn test_stale_peer_removal() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events1 = node1.subscribe_events(); @@ -485,7 +495,9 @@ async fn test_heartbeat_keeps_connection_alive() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let addrs2 = node2.listen_addrs().await; let addr2 = addrs2.first().expect("Need address").to_string(); @@ -535,6 +547,7 @@ async fn test_dht_network_manager_integration() { let config = create_test_node_config(); let _node = P2PNode::new(config).await.expect("Failed to create node"); + _node.start().await.expect("Failed to start node"); // Check if DHT network manager is accessible // This would require adding a method to P2PNode like: @@ -575,12 +588,15 @@ async fn test_three_node_dht_routing() { let node_a = P2PNode::new(config_a) .await .expect("Failed to create node A"); + node_a.start().await.expect("Failed to start node A"); let node_b = P2PNode::new(config_b) .await .expect("Failed to create node B"); + node_b.start().await.expect("Failed to start node B"); let node_c = P2PNode::new(config_c) .await .expect("Failed to create node C"); + node_c.start().await.expect("Failed to start node C"); // Get addresses let addrs_a = node_a.listen_addrs().await; @@ -655,7 +671,9 @@ async fn test_dht_message_routing() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events2 = node2.subscribe_events(); @@ -699,6 +717,7 @@ async fn test_dht_message_routing() { async fn test_node_creation_sanity() { let config = create_test_node_config(); let node = P2PNode::new(config).await.expect("Failed to create node"); + node.start().await.expect("Failed to start node"); let addrs = node.listen_addrs().await; assert!( @@ -716,7 +735,9 @@ async fn test_event_subscription_sanity() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events1 = node1.subscribe_events(); @@ -760,7 +781,9 @@ async fn test_simple_ping_pong() { let config2 = create_test_node_config(); let node1 = Arc::new(P2PNode::new(config1).await.expect("Failed to create node1")); + node1.start().await.expect("Failed to start node1"); let node2 = Arc::new(P2PNode::new(config2).await.expect("Failed to create node2")); + node2.start().await.expect("Failed to start node2"); let mut events1 = node1.subscribe_events(); let mut events2 = node2.subscribe_events(); @@ -841,7 +864,9 @@ async fn test_multiple_sequential_messages() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events2 = node2.subscribe_events(); @@ -923,7 +948,9 @@ async fn test_connection_stays_alive() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events2 = node2.subscribe_events(); @@ -981,7 +1008,9 @@ async fn test_reconnection_works() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let addrs2 = node2.listen_addrs().await; let addr2 = addrs2.first().expect("Need address").to_string(); @@ -1044,7 +1073,9 @@ async fn test_peer_events_sequence() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events1 = node1.subscribe_events(); @@ -1103,7 +1134,9 @@ async fn test_large_message_transfer() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events2 = node2.subscribe_events(); @@ -1168,7 +1201,9 @@ async fn test_multiple_protocols() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events2 = node2.subscribe_events(); @@ -1247,7 +1282,9 @@ async fn test_no_duplicate_disconnect_events() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events1 = node1.subscribe_events(); @@ -1315,7 +1352,9 @@ async fn test_peer_cleanup_timing() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let addrs2 = node2.listen_addrs().await; let addr2 = addrs2.first().expect("Need address").to_string(); @@ -1384,7 +1423,9 @@ async fn test_empty_message_handling() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events2 = node2.subscribe_events(); @@ -1465,7 +1506,9 @@ async fn test_rapid_reconnection_stress() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let addrs2 = node2.listen_addrs().await; let addr2 = addrs2.first().expect("Need address").to_string(); @@ -1515,7 +1558,9 @@ async fn test_concurrent_message_flood() { let config2 = create_test_node_config(); let node1 = Arc::new(P2PNode::new(config1).await.expect("Failed to create node1")); + node1.start().await.expect("Failed to start node1"); let node2 = Arc::new(P2PNode::new(config2).await.expect("Failed to create node2")); + node2.start().await.expect("Failed to start node2"); let mut events1 = node1.subscribe_events(); let mut events2 = node2.subscribe_events(); @@ -1639,7 +1684,9 @@ async fn test_send_to_disconnecting_peer() { let config2 = create_test_node_config(); let node1 = Arc::new(P2PNode::new(config1).await.expect("Failed to create node1")); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let addrs2 = node2.listen_addrs().await; let addr2 = addrs2.first().expect("Need address").to_string(); @@ -1679,6 +1726,7 @@ async fn test_send_to_disconnecting_peer() { // Verify node1 is still functional let config3 = create_test_node_config(); let node3 = P2PNode::new(config3).await.expect("Failed to create node3"); + node3.start().await.expect("Failed to start node3"); let addrs3 = node3.listen_addrs().await; let addr3 = addrs3.first().expect("Need address").to_string(); @@ -1707,7 +1755,9 @@ async fn test_late_event_subscription() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); // Connect BEFORE subscribing let addrs2 = node2.listen_addrs().await; @@ -1762,7 +1812,9 @@ async fn test_zero_stale_threshold() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let addrs2 = node2.listen_addrs().await; let addr2 = addrs2.first().expect("Need address").to_string(); @@ -1812,7 +1864,9 @@ async fn test_short_stale_threshold() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events2 = node2.subscribe_events(); @@ -1858,6 +1912,7 @@ async fn test_many_peers_scaling() { let config1 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let peer_count = 10; let mut nodes: Vec = Vec::with_capacity(peer_count); @@ -1869,6 +1924,7 @@ async fn test_many_peers_scaling() { let node = P2PNode::new(config) .await .expect("Failed to create peer node"); + node.start().await.expect("Failed to start peer node"); let addrs = node.listen_addrs().await; let addr = addrs.first().expect("Need address").to_string(); @@ -1931,7 +1987,9 @@ async fn test_graceful_shutdown() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let addrs2 = node2.listen_addrs().await; let addr2 = addrs2.first().expect("Need address").to_string(); @@ -1969,7 +2027,9 @@ async fn test_event_subscriber_cleanup() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); // Create multiple subscribers let sub1 = node2.subscribe_events();