fix(p2p): graceful shutdown and DHT routing table fix#26
Conversation
There was a problem hiding this comment.
Pull request overview
Update P2P shutdown semantics to use cooperative cancellation, and adjust DHT routing table promotion behavior to restore routing population from general peer interactions.
Changes:
- Replace
AtomicBoolshutdown flags withtokio_util::sync::CancellationTokenacross node/transport/DHT components. - Refactor shutdown teardown to reduce “channel closed”/join noise and improve task cancellation handling.
- Rework DHT routing promotion logic (and remove participation-only promotion + its tests).
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/network_wiring_e2e_test.rs | Updates e2e test to match P2PNode::is_running() API change. |
| src/transport_handle.rs | Converts transport shutdown to CancellationToken and centralizes join/teardown logic. |
| src/transport/ant_quic_adapter.rs | Updates recv task shutdown checks to use CancellationToken. |
| src/network.rs | Replaces running state with a node-level CancellationToken and updates run/stop/is_running. |
| src/dht_network_manager.rs | Switches to CancellationToken and revises peer tracking/routing promotion behavior; removes old promotion tests. |
| src/dht/core_engine.rs | Switches core maintenance shutdown to CancellationToken and select!-based exit. |
| Cargo.toml | Adds tokio-util dependency to support CancellationToken. |
| pub fn is_running(&self) -> bool { | ||
| !self.shutdown.is_cancelled() | ||
| } |
There was a problem hiding this comment.
is_running() is now derived solely from shutdown.is_cancelled(), but shutdown is initialized as a fresh (non-cancelled) token in new(). That makes a newly constructed node report is_running() == true even before start() is called, which contradicts the semantics used by the existing tests (e.g., test_p2pnode_initialization). Consider reintroducing an explicit running/started state, or storing the shutdown token in a replaceable slot (e.g., RwLock<CancellationToken>) so start() creates a new token and stop() cancels it; then is_running() can reflect “started && not cancelled”.
| loop { | ||
| if shutdown.load(Ordering::Relaxed) { | ||
| if shutdown.is_cancelled() { | ||
| break; | ||
| } | ||
| match transport.endpoint().recv().await { |
There was a problem hiding this comment.
The recv loop checks shutdown.is_cancelled() only before awaiting endpoint().recv().await, so shutdown may block indefinitely if no packets arrive. Use tokio::select! to await either shutdown.cancelled() or transport.endpoint().recv() so the task can exit promptly on shutdown.
| /// Update peer information | ||
| async fn update_peer_info(&self, peer_id: PeerId, _message: &DhtNetworkMessage) { | ||
| let dht_key = crate::dht::derive_dht_key_from_peer_id(&peer_id); |
There was a problem hiding this comment.
update_peer_info() no longer promotes peers into the routing table at all (the previous promotion block was removed), but it still populates DhtPeerInfo with addresses. Combined with the current handle_peer_connected() logic (which only adds to the routing table when addresses are newly discovered), a peer first seen via DHT traffic can end up never being added to the routing table because it already has addresses by the time it connects. To restore “promote on any valid interaction,” either (a) re-add routing table insertion in update_peer_info() when an address is available, or (b) make handle_peer_connected() attempt add_node whenever the peer is connected and has an address (ideally with an explicit ‘already added’ guard or an idempotent/‘already present’ handling path).
| // Track peer and decide whether it should be promoted to routing table. | ||
| let should_add_to_routing = { | ||
| let mut peers = self.dht_peers.write().await; | ||
| match peers.entry(peer_id.clone()) { | ||
| std::collections::hash_map::Entry::Occupied(mut entry) => { | ||
| let peer_info = entry.get_mut(); | ||
| let had_addresses = !peer_info.addresses.is_empty(); | ||
| peer_info.last_seen = Instant::now(); | ||
| peer_info.is_connected = true; | ||
| if !addresses.is_empty() { | ||
| peer_info.addresses = addresses.clone(); | ||
| } | ||
| !addresses.is_empty() && !had_addresses | ||
| } |
There was a problem hiding this comment.
The should_add_to_routing heuristic is based on “addresses became non-empty,” which conflates ‘has enough info to add’ with ‘not yet added’. Since in_routing_table was removed, there’s no durable way to prevent duplicate adds or to retry after a prior add_node failure, and there’s also a risk of never adding peers that already had addresses from earlier tracking. Consider restoring an explicit routing-membership flag (or querying the core engine for presence) so the decision is based on actual routing table state rather than address transitions.
| @@ -2713,69 +2686,3 @@ impl Default for DhtNetworkConfig { | |||
| } | |||
| } | |||
| } | |||
There was a problem hiding this comment.
This PR removes the existing unit tests that exercised routing promotion behavior (the deleted should_promote_to_routing tests), but the routing promotion logic is still being modified substantially. Add new tests that verify the intended behavior after the revert: peers are promoted on non-DHT interactions / general connectivity (e.g., via PeerConnected handling) and that PeerDiscovered is emitted appropriately.
Greptile OverviewGreptile SummaryReplaces Major changes:
Issues found:
Confidence Score: 3/5
|
| Filename | Overview |
|---|---|
| src/dht_network_manager.rs | Major changes: replaced AtomicBool with CancellationToken, reverted participation-only routing promotion (peers now promoted on any connection), removed unit tests. Potential duplicate routing table additions. |
| src/network.rs | Replaced RwLock<bool> with CancellationToken for shutdown, made is_running() synchronous, updated run loop to use tokio::select! |
| src/transport/ant_quic_adapter.rs | Updated spawn_recv_task to accept CancellationToken; race condition where shutdown check before blocking recv() won't exit until next message |
| src/transport_handle.rs | Replaced AtomicBool with CancellationToken, refactored shutdown with helper methods for joining tasks, updated all background tasks to use tokio::select! |
Sequence Diagram
sequenceDiagram
participant User
participant P2PNode
participant DhtNetworkManager
participant TransportHandle
participant BackgroundTasks
User->>P2PNode: stop()
P2PNode->>P2PNode: shutdown.cancel()
Note over P2PNode: Signals run loop to exit
P2PNode->>DhtNetworkManager: stop()
DhtNetworkManager->>DhtNetworkManager: shutdown.cancel()
DhtNetworkManager->>DhtNetworkManager: dht.signal_shutdown()
Note over DhtNetworkManager: Cancels all DHT background tasks
P2PNode->>TransportHandle: stop()
TransportHandle->>TransportHandle: shutdown.cancel()
TransportHandle->>TransportHandle: dual_node.shutdown_endpoints()
par Join all background tasks
TransportHandle->>BackgroundTasks: join recv tasks
TransportHandle->>BackgroundTasks: join listener task
TransportHandle->>BackgroundTasks: join connection monitor
TransportHandle->>BackgroundTasks: join keepalive task
TransportHandle->>BackgroundTasks: join maintenance task
end
BackgroundTasks-->>TransportHandle: Tasks complete via tokio::select!
TransportHandle->>TransportHandle: disconnect_all_peers()
TransportHandle-->>P2PNode: Transport stopped
P2PNode-->>User: Node stopped gracefully
| loop { | ||
| if shutdown.load(Ordering::Relaxed) { | ||
| if shutdown.is_cancelled() { | ||
| break; | ||
| } |
There was a problem hiding this comment.
Race condition: shutdown check happens before recv(), but recv() can block indefinitely. If cancellation occurs while waiting in recv(), the task won't exit until the next message arrives.
Use tokio::select! to properly handle cancellation:
| loop { | |
| if shutdown.load(Ordering::Relaxed) { | |
| if shutdown.is_cancelled() { | |
| break; | |
| } | |
| loop { | |
| tokio::select! { | |
| () = shutdown.cancelled() => { | |
| break; | |
| } | |
| result = transport.endpoint().recv() => { | |
| match result { |
| let peer_info = entry.get_mut(); | ||
| let had_addresses = !peer_info.addresses.is_empty(); | ||
| peer_info.last_seen = Instant::now(); | ||
| peer_info.is_connected = true; | ||
| if !addresses.is_empty() { | ||
| peer_info.addresses = addresses.clone(); | ||
| } | ||
| !addresses.is_empty() && !had_addresses |
There was a problem hiding this comment.
Logic issue with routing promotion: if a peer connects multiple times or if addresses changes from non-empty to a different non-empty list, had_addresses will be true but the condition !addresses.is_empty() && !had_addresses will be false, preventing re-promotion. However, if addresses change (e.g., peer reconnects from different IP), we'll attempt to add the peer again on line 2292, which may cause duplicate entries or errors.
Consider tracking whether the peer is already in the routing table to avoid duplicate add_node calls.
…hutdown Replace check-then-await pattern with tokio::select! to race the cancellation token against recv(), eliminating a race condition where shutdown could be missed between the is_cancelled() check and blocking on recv(). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
@greptile-apps @claude please re-review |
| /// so short-lived request clients are not inserted as DHT routing peers. | ||
| async fn update_peer_info(&self, peer_id: PeerId, message: &DhtNetworkMessage) { | ||
| /// Update peer information | ||
| async fn update_peer_info(&self, peer_id: PeerId, _message: &DhtNetworkMessage) { |
There was a problem hiding this comment.
The parameter _message is prefixed with an underscore but never used in the function body. Since this parameter was previously used to determine routing promotion but is no longer needed after the revert, it should be removed from the function signature entirely.
| async fn update_peer_info(&self, peer_id: PeerId, _message: &DhtNetworkMessage) { | |
| async fn update_peer_info(&self, peer_id: PeerId, _: &DhtNetworkMessage) { |
| avg_latency: Duration::from_millis(50), | ||
| reliability_score: 1.0, | ||
| }); | ||
| !addresses.is_empty() |
There was a problem hiding this comment.
The logic returns true when a new peer is added with addresses, but on line 2247 it returns true only when addresses were empty before and are now non-empty (!addresses.is_empty() && !had_addresses). This inconsistency means new peers will always be promoted even if they shouldn't be, while existing peers will only be promoted when transitioning from no addresses to having addresses. The logic should be consistent: either both cases should check !had_addresses or both should simply check !addresses.is_empty().
Greptile OverviewGreptile SummaryReplaced scattered Key improvements:
DHT routing revert:
Confidence Score: 4/5
|
| Filename | Overview |
|---|---|
| src/network.rs | Replaced RwLock<bool> with CancellationToken for shutdown signaling; simplified is_running() to non-async; refactored run loop to use tokio::select! for clean cancellation |
| src/dht_network_manager.rs | Replaced AtomicBool with CancellationToken; reverted participation-only routing (peers now promoted on any connection); removed in_routing_table field from DhtPeerInfo; refactored event handler to use tokio::select! |
| src/transport_handle.rs | Replaced AtomicBool with CancellationToken for shutdown coordination; refactored all background tasks (keepalive, periodic maintenance, connection monitor) to use tokio::select!; added helper methods to reduce code duplication in task cleanup |
| src/transport/ant_quic_adapter.rs | Updated spawn_recv_task to accept CancellationToken instead of AtomicBool; implemented proper cancellation with tokio::select! to handle shutdown immediately during blocking recv() |
Sequence Diagram
sequenceDiagram
participant User
participant P2PNode
participant DhtNetworkManager
participant TransportHandle
participant AntQuicAdapter
participant BackgroundTasks
User->>P2PNode: stop()
activate P2PNode
P2PNode->>P2PNode: shutdown.cancel()
Note over P2PNode: CancellationToken signals shutdown
P2PNode->>DhtNetworkManager: stop()
activate DhtNetworkManager
DhtNetworkManager->>DhtNetworkManager: shutdown.cancel()
DhtNetworkManager->>BackgroundTasks: select! cancelled()
BackgroundTasks-->>DhtNetworkManager: tasks exit cleanly
DhtNetworkManager-->>P2PNode: Ok
deactivate DhtNetworkManager
P2PNode->>TransportHandle: stop()
activate TransportHandle
TransportHandle->>TransportHandle: shutdown.cancel()
TransportHandle->>BackgroundTasks: select! cancelled()
BackgroundTasks-->>TransportHandle: tasks exit cleanly
TransportHandle->>AntQuicAdapter: shutdown_endpoints()
activate AntQuicAdapter
AntQuicAdapter->>BackgroundTasks: recv tasks via select!
BackgroundTasks-->>AntQuicAdapter: exit immediately
AntQuicAdapter-->>TransportHandle: Ok
deactivate AntQuicAdapter
TransportHandle-->>P2PNode: Ok
deactivate TransportHandle
P2PNode-->>User: Ok
deactivate P2PNode
Note over User,BackgroundTasks: No "channel closed" errors
|
Claude finished @mickvandijke's task in 2m 30s —— View job Re-review of PR #26Tasks:
SummaryThis PR makes good progress on improving shutdown semantics with the ✅ What's Working Well1. CancellationToken MigrationThe refactoring from
2. Code Quality
🚨 Issues FoundIssue #1: Inconsistent Routing Table Promotion Logic
|
…rt() is_running() only checked !shutdown.is_cancelled(), which is true for a freshly constructed CancellationToken. Add an is_started AtomicBool that is set on start() and cleared on stop() so the method correctly reports false before the node is started. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| pub async fn stop(&self) -> Result<()> { | ||
| info!("Stopping P2P node..."); | ||
|
|
||
| // Set running state to false | ||
| *self.running.write().await = false; | ||
| // Signal the run loop to exit | ||
| self.shutdown.cancel(); | ||
|
|
||
| // Stop DHT manager first so leave messages can be sent while transport is still active. | ||
| self.dht_manager.stop().await?; |
There was a problem hiding this comment.
CancellationToken cannot be “uncancelled”, but P2PNode stores a single token created in new() and cancels it in stop(). After the first stop, is_running() will always return false and run() will immediately exit even if start() is called again (since start() does not create a fresh token). If restart is intended, store the token behind interior mutability (e.g., RwLock<CancellationToken> / Mutex<CancellationToken> / ArcSwap) and replace it with a new token in start() (or create a per-run child token). If restart is not intended, consider making that explicit by returning an error on subsequent start() calls (or redesigning the API so stop() consumes the instance).
| pub fn is_running(&self) -> bool { | ||
| self.is_started.load(std::sync::atomic::Ordering::Acquire) && !self.shutdown.is_cancelled() | ||
| } |
There was a problem hiding this comment.
CancellationToken cannot be “uncancelled”, but P2PNode stores a single token created in new() and cancels it in stop(). After the first stop, is_running() will always return false and run() will immediately exit even if start() is called again (since start() does not create a fresh token). If restart is intended, store the token behind interior mutability (e.g., RwLock<CancellationToken> / Mutex<CancellationToken> / ArcSwap) and replace it with a new token in start() (or create a per-run child token). If restart is not intended, consider making that explicit by returning an error on subsequent start() calls (or redesigning the API so stop() consumes the instance).
| // Track peer and decide whether it should be promoted to routing table. | ||
| let should_add_to_routing = { | ||
| let mut peers = self.dht_peers.write().await; | ||
| match peers.entry(peer_id.clone()) { | ||
| std::collections::hash_map::Entry::Occupied(mut entry) => { | ||
| let peer_info = entry.get_mut(); | ||
| let had_addresses = !peer_info.addresses.is_empty(); | ||
| peer_info.last_seen = Instant::now(); | ||
| peer_info.is_connected = true; | ||
| if !addresses.is_empty() { | ||
| peer_info.addresses = addresses.clone(); | ||
| } | ||
| !addresses.is_empty() && !had_addresses | ||
| } | ||
| std::collections::hash_map::Entry::Vacant(entry) => { | ||
| entry.insert(DhtPeerInfo { | ||
| peer_id: peer_id.clone(), | ||
| dht_key, | ||
| addresses: addresses.clone(), | ||
| last_seen: Instant::now(), | ||
| is_connected: true, | ||
| avg_latency: Duration::from_millis(50), | ||
| reliability_score: 1.0, | ||
| }); | ||
| !addresses.is_empty() | ||
| } | ||
| } | ||
| std::collections::hash_map::Entry::Vacant(entry) => { | ||
| entry.insert(DhtPeerInfo { | ||
| peer_id: peer_id.clone(), | ||
| dht_key, | ||
| addresses, | ||
| last_seen: Instant::now(), | ||
| is_connected: true, | ||
| avg_latency: Duration::from_millis(50), | ||
| reliability_score: 1.0, | ||
| in_routing_table: false, | ||
| }); | ||
| }; | ||
|
|
||
| // Skip peers with no addresses - they cannot be used for DHT routing. | ||
| let address_str = match addresses.first() { | ||
| Some(addr) => addr.to_string(), | ||
| None => { | ||
| warn!( | ||
| "Peer {} has no addresses, skipping DHT routing table addition", | ||
| peer_id | ||
| ); | ||
| return; | ||
| } | ||
| }; | ||
|
|
||
| debug!( | ||
| "Peer {} tracked for DHT liveness; routing promotion deferred until non-lookup DHT participation", | ||
| peer_id | ||
| ); | ||
| } | ||
| if !should_add_to_routing { | ||
| debug!("Peer {} already tracked in DHT routing state", peer_id); | ||
| return; | ||
| } |
There was a problem hiding this comment.
This promotion logic can permanently skip adding a peer to the DHT routing table in common orderings. Example: update_peer_info() may insert/update a peer with non-empty peer_info.addresses before handle_peer_connected() runs; then had_addresses becomes true and should_add_to_routing becomes false, and the peer is never added (since update_peer_info() no longer performs the add_node call). Additionally, the code warns + returns on missing addresses.first() before checking should_add_to_routing, which can create noisy warnings and early returns even when no routing-table add is intended. Consider gating the address requirement and warning behind should_add_to_routing, and determine “already in routing table” via an explicit flag (reintroduce in_routing_table) or by making add_node idempotent (attempt to add whenever an address is available and treat “already exists” as success).
| // Add to DHT routing table. | ||
| { | ||
| use crate::dht::core_engine::{NodeCapacity, NodeId, NodeInfo}; | ||
|
|
||
| let node_info = NodeInfo { | ||
| id: NodeId::from_bytes(dht_key), | ||
| address: address_str, | ||
| last_seen: SystemTime::now(), | ||
| capacity: NodeCapacity::default(), | ||
| }; | ||
|
|
||
| if let Err(e) = self.dht.write().await.add_node(node_info).await { | ||
| warn!("Failed to add peer {} to DHT routing table: {}", peer_id, e); | ||
| } else { | ||
| info!("Added peer {} to DHT routing table", peer_id); | ||
| } | ||
| } | ||
|
|
||
| /// Whether an inbound DHT message should promote the sender to routing. | ||
| fn should_promote_to_routing(message: &DhtNetworkMessage) -> bool { | ||
| matches!(&message.message_type, DhtMessageType::Request) | ||
| && matches!( | ||
| &message.payload, | ||
| DhtNetworkOperation::Put { .. } | DhtNetworkOperation::Join | ||
| ) | ||
| if self.event_tx.receiver_count() > 0 { | ||
| let _ = self | ||
| .event_tx | ||
| .send(DhtNetworkEvent::PeerDiscovered { peer_id, dht_key }); | ||
| } |
There was a problem hiding this comment.
The routing promotion behavior was substantially changed and the prior unit tests around routing promotion were removed. Add tests that cover the new expected behavior (e.g., a peer gets added to the routing table and PeerDiscovered fires exactly once), including the important ordering where peer metadata is learned/updated before the PeerConnected event (and vice versa), so routing table population stays reliable.
Summary
AtomicBoolshutdown flags withCancellationTokenacross P2PNode, DhtNetworkManager, and TransportHandle for clean, cooperative shutdown. Addstokio-utildependency. Consolidates scattered shutdown logic and eliminates "channel closed" noise during teardown.ff30dab) — peers are once again promoted on any valid interaction, not just DHT operations, restoring proper routing table population.Test plan
cargo test— all existing tests passcargo clippy -- -D warnings -D clippy::unwrap_used -D clippy::expect_used— no new warnings🤖 Generated with Claude Code