Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub fn is_dht_participant(user_agent: &str) -> bool {
}

/// Capacity of the internal channel used by the message receiving system.
pub(crate) const MESSAGE_RECV_CHANNEL_CAPACITY: usize = 256;
pub(crate) const MESSAGE_RECV_CHANNEL_CAPACITY: usize = 2048;

/// Maximum number of concurrent in-flight request/response operations.
pub(crate) const MAX_ACTIVE_REQUESTS: usize = 256;
Expand Down
39 changes: 19 additions & 20 deletions src/transport_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1722,18 +1722,22 @@ impl TransportHandle {
if let Some(ref app_id) = authenticated_node_id
&& *app_id != self_peer_id
{
// Hold `peer_to_channel` across the transport-level
// `register_connection_peer_id` call so the app-level
// map and the transport's internal addr→peer map are
// consistent for any concurrent reader. Without this,
// there is a microsecond window in which a hole-punch
// lookup that depends on the registration can fail
// spuriously even though the app-level map already
// says the peer is known.
let mut p2c = peer_to_channel.write().await;
let is_new_peer = !p2c.contains_key(app_id);
let channels = p2c.entry(*app_id).or_default();
let inserted = channels.insert(channel_id.clone());
// Update app-level peer↔channel mappings under write locks,
// then drop all locks before the async transport registration.
// This trades a microsecond consistency window (hole-punch
// lookups may briefly miss a peer that the app-level map
// already knows) for dramatically better throughput under
// load — the previous pattern held a write lock across an
// async call, blocking all 8 shard consumers on every
// authenticated message.
let (is_new_peer, inserted) = {
let mut p2c = peer_to_channel.write().await;
let is_new = !p2c.contains_key(app_id);
let channels = p2c.entry(*app_id).or_default();
let ins = channels.insert(channel_id.clone());
(is_new, ins)
}; // p2c lock dropped here

if inserted {
channel_to_peers
.write()
Expand All @@ -1743,17 +1747,12 @@ impl TransportHandle {
.insert(*app_id);
}
Comment on lines 1733 to 1748
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Consistency window broader than described — stale peer state, not just omission

The PR description characterises the window as "hole-punch lookups may briefly miss a peer", which is the omission direction (transport unaware of new peer). There is also a commission direction: peer_to_channel and channel_to_peers are now updated in two separate lock acquisitions, and a concurrent remove_channel_mappings_static call can race between them.

Concrete failure path: (1) Shard acquires p2c.write(), inserts peer_X → {channel_X}, drops the lock. (2) Tokio schedules the disconnect handler; remove_channel_mappings_static acquires p2c.write() (sees the entry) then c2p.write() — but c2p has no channel_X entry yet, so c2p.remove(channel_id) returns None and cleanup exits without removing peer_X from p2c. (3) The shard resumes and inserts the now-stale channel_X → peer_X entry into c2p.

Both maps permanently show the peer as connected until restart or reconnect. is_peer_connected() / connected_peers() / peer_count() return incorrect results, and a spurious PeerConnected event is emitted. The race window is extremely narrow and the performance trade-off is well-justified. A future hardening option is to update both maps in one scoped block:

let (is_new_peer, inserted) = {
    let mut p2c = peer_to_channel.write().await;
    let mut c2p = channel_to_peers.write().await;   // hold both briefly
    let is_new = !p2c.contains_key(app_id);
    let channels = p2c.entry(*app_id).or_default();
    let ins = channels.insert(channel_id.clone());
    if ins {
        c2p.entry(channel_id.clone()).or_default().insert(*app_id);
    }
    (is_new, ins)
}; // both locks dropped; register_connection_peer_id runs without any lock
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/transport_handle.rs
Line: 1733-1748

Comment:
**Consistency window broader than described — stale peer state, not just omission**

The PR description characterises the window as "hole-punch lookups may briefly miss a peer", which is the *omission* direction (transport unaware of new peer). There is also a *commission* direction: `peer_to_channel` and `channel_to_peers` are now updated in two separate lock acquisitions, and a concurrent `remove_channel_mappings_static` call can race between them.

Concrete failure path: (1) Shard acquires `p2c.write()`, inserts `peer_X → {channel_X}`, drops the lock. (2) Tokio schedules the disconnect handler; `remove_channel_mappings_static` acquires `p2c.write()` (sees the entry) then `c2p.write()` — but `c2p` has no `channel_X` entry yet, so `c2p.remove(channel_id)` returns `None` and cleanup exits **without** removing `peer_X` from `p2c`. (3) The shard resumes and inserts the now-stale `channel_X → peer_X` entry into `c2p`.

Both maps permanently show the peer as connected until restart or reconnect. `is_peer_connected()` / `connected_peers()` / `peer_count()` return incorrect results, and a spurious `PeerConnected` event is emitted. The race window is extremely narrow and the performance trade-off is well-justified. A future hardening option is to update both maps in one scoped block:

```rust
let (is_new_peer, inserted) = {
    let mut p2c = peer_to_channel.write().await;
    let mut c2p = channel_to_peers.write().await;   // hold both briefly
    let is_new = !p2c.contains_key(app_id);
    let channels = p2c.entry(*app_id).or_default();
    let ins = channels.insert(channel_id.clone());
    if ins {
        c2p.entry(channel_id.clone()).or_default().insert(*app_id);
    }
    (is_new, ins)
}; // both locks dropped; register_connection_peer_id runs without any lock
```

How can I resolve this? If you propose a fix, please make it concise.


// Register peer ID at the low-level transport
// endpoint so PUNCH_ME_NOW relay can find this
// peer by identity instead of socket address.
// Register peer ID at the low-level transport endpoint.
// Now runs without holding any write locks.
dual_node_for_peer_reg
.register_connection_peer_id(from_addr, *app_id.to_bytes())
.await;

// Drop the lock before emitting events so subscribers
// that re-enter the registry don't deadlock.
drop(p2c);

if is_new_peer {
peer_user_agents
.write()
Expand Down Expand Up @@ -1853,7 +1852,7 @@ const MESSAGE_DISPATCH_SHARDS: usize = 8;
/// MESSAGE_DISPATCH_SHARDS`, but when that division rounds to something
/// too small for healthy bursts we floor it at this value so each shard
/// retains a reasonable amount of buffering headroom.
const MIN_SHARD_CHANNEL_CAPACITY: usize = 16;
const MIN_SHARD_CHANNEL_CAPACITY: usize = 128;

/// Log a warning every Nth dropped message in the dispatcher.
///
Expand Down
Loading