Skip to content

perf(dht,transport): 1000-node scale fixes#72

Closed
mickvandijke wants to merge 1 commit intorc-2026.4.1from
split/perf-1000-node
Closed

perf(dht,transport): 1000-node scale fixes#72
mickvandijke wants to merge 1 commit intorc-2026.4.1from
split/perf-1000-node

Conversation

@mickvandijke
Copy link
Copy Markdown
Collaborator

@mickvandijke mickvandijke commented Apr 6, 2026

Splits the 1000-node perf fixes out of #70 so they can be reviewed and
merged independently from the NAT-traversal hardening in the original
bundle.

Summary

At 1000 nodes two hot paths dominated serialisation in the stack, both
surfacing as response-delivery timeouts under load.

perf(dht): atomic `last_seen` under a read lock

The routing-table touch was called on every inbound DHT message and
always took a write lock, serialising every read behind each update.
`NodeInfo.last_seen` is now an `AtomicInstant` with a fast path that
atomically bumps the timestamp under a read lock when the address merge
would be a no-op (address absent, already present with the same type,
or skipped by the loopback-injection rule). The write-lock slow path
still runs when a real address merge or re-classification is needed.

The fast path correctly respects `addr_type`: promotion of an existing
address from `Direct` → `Relay` escalates to the slow path so
`merge_typed_address` can re-order by type priority. Without this the
relay-promotion call site silently bumped `last_seen` and dropped the
re-classification, breaking the type-priority ordering the dialer relies
on.

perf(transport): shard inbound message dispatcher

The single consumer task draining every inbound message took three
async write locks plus an awaited peer-id registration per message —
fine at small scale, the dominant serialisation point at 1000 peers.
The receive loop is now split into a light dispatcher that hashes the
source `IpAddr` (not full `SocketAddr`, so an ephemeral-port reconnect
stays on the same shard) and hands off to one of 8 parallel shard
consumers, each running the full parse/register/broadcast pipeline
independently.

Fault isolation

The dispatcher uses `try_send` with explicit handling of both
`Full` and `Closed` errors: a single shard's panic or backpressure no
longer collapses the other seven. Drops are counted and warn-logged at
every `SHARD_DROP_LOG_INTERVAL` (64) occurrences. The dispatcher only
exits when its upstream channel itself closes.

Lock-hold fix

The `peer_to_channel` write lock is now held across the transport-level
`register_connection_peer_id` call so the app-level map and the
transport's internal addr→peer map are consistent for any concurrent
reader. Previously there was a microsecond window in which a hole-punch
lookup could fail spuriously.

Ordering caveat documented

`shard_index_for_addr`'s docstring previously claimed to preserve
per-peer ordering across reconnects. This is only true across
ephemeral-port reconnects from the same source IP. NAT rebinding,
Wi-Fi↔cellular roaming, or dual-stack failover routes the peer to a
different shard, breaking causal ordering at the application layer.
Docstring now says "per source IP" explicitly.

chore: drop needless struct update in IPDiversityConfig test

Drive-by `clippy::needless_update` fix in test code — also applied in
the sibling NAT PR, so whichever merges second will have a benign
no-op there.

Verification

  • `cargo build --lib` — clean
  • `cargo fmt --check` — clean
  • `cargo clippy --lib -- -D warnings -D clippy::unwrap_used -D clippy::expect_used` — clean
  • `cargo clippy --all-targets -- -D warnings` — clean
  • `cargo test --lib` — 269 passed, 0 failed

Split from #70

This PR and its sibling replace the original #70, which bundled these
1000-node scale fixes with NAT-traversal hardening and an observed
address cache. Byte-for-byte the union of this PR and the sibling
equals the content of #70 (verified by merging the two split branches
and diffing against the original HEAD).

The sibling PR covers the NAT-traversal hardening.

🤖 Generated with Claude Code

Greptile Summary

This PR splits 1000-node performance fixes out of #70: an atomic last_seen fast path in the DHT routing table (running under a read lock via AtomicInstant/parking_lot::Mutex<Instant>) and a sharded inbound message dispatcher fanning out across 8 parallel consumers keyed by source IP. Both changes are logically correct; the type-promotion escalation to the write-lock slow path is handled properly, lock ordering (p2c → c2p) is consistent throughout, and the updated docstring caveat about per-IP (not per-peer) message ordering is accurate. Two minor style improvements are noted inline.

Confidence Score: 5/5

Safe to merge — no correctness bugs found; remaining findings are style and performance suggestions only

Both core changes are logically correct and well-documented. Lock ordering (p2c → c2p) is consistent throughout the codebase, the type-promotion escalation to the slow path is correctly guarded, and fault isolation in the dispatcher (try_send with explicit Full/Closed handling) is appropriate. The two P2 findings do not affect correctness or safety and do not block merge.

src/transport_handle.rs — the lock-hold scope in run_shard_consumer (lines 1568–1590) is broader than necessary for established connections

Important Files Changed

Filename Overview
src/dht/core_engine.rs Introduces AtomicInstant (parking_lot::Mutex) and a correct fast-path touch that only updates last_seen under a read lock when address merge is a no-op; type-promotion (Direct→Relay) correctly escalates to the write-lock slow path
src/transport_handle.rs Adds 8-shard message dispatcher with per-IP hash routing and try_send fault isolation; peer_to_channel write lock is held across register_connection_peer_id for every authenticated message including repeats from already-known peers
src/dht/security_tests.rs Drive-by clippy::needless_update fix: removed ..Default::default() spreads where both IPDiversityConfig fields were already explicitly set
src/dht_network_manager.rs No diff changes; reviewed as context — consumers of touch_node_typed benefit from the new read-lock fast path without any required call-site changes

Sequence Diagram

sequenceDiagram
    participant T as Transport recv tasks
    participant D as Dispatcher
    participant S as Shard 0..7
    participant P2C as peer_to_channel (RwLock)
    participant RT as RoutingTable (RwLock)

    T->>D: (from_addr, bytes)
    D->>D: shard_idx = hash(ip) % 8
    alt shard channel healthy
        D-->>S: try_send(from_addr, bytes)
    else Full or Closed
        D->>D: drop_counter++ warn every 64
    end

    S->>S: parse_protocol_message
    S->>P2C: write lock (every authed msg)
    S->>S: insert channel mapping
    S->>T: register_connection_peer_id (await)
    S->>P2C: drop write lock
    S->>S: emit PeerConnected / broadcast event

    Note over S,RT: DHT touch (separate path)
    S->>RT: read lock → try_touch_last_seen
    Note over RT: AtomicInstant.store_now() under read lock
    alt address new or type changed
        S->>RT: write lock → touch_node (slow path)
    end
Loading
Prompt To Fix All With AI
This is a comment left during a code review.
Path: src/transport_handle.rs
Line: 1568-1590

Comment:
**Write lock held for every authenticated message, not just new registrations**

`peer_to_channel` write lock (`p2c`) is acquired and `register_connection_peer_id` is awaited on the hot path for **every** authenticated message — including repeat messages from an already-known peer where `inserted = false`. The consistency window this addresses (app-level map vs. transport's addr→peer map) only exists during the **first** channel registration. Moving `register_connection_peer_id` inside the `if inserted` block holds the lock only during new-channel registrations, eliminating write-lock contention on the repeat-message path:

```
let mut p2c = peer_to_channel.write().await;
let is_new_peer = !p2c.contains_key(app_id);
let channels = p2c.entry(*app_id).or_default();
let inserted = channels.insert(channel_id.clone());
if inserted {
    channel_to_peers
        .write()
        .await
        .entry(channel_id.clone())
        .or_default()
        .insert(*app_id);
    // Hold p2c here so app-level map and transport's addr→peer map
    // become consistent atomically — window only exists on first insert.
    dual_node_for_peer_reg
        .register_connection_peer_id(from_addr, *app_id.to_bytes())
        .await;
}
drop(p2c);
```

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: src/transport_handle.rs
Line: 1467

Comment:
**Unnecessary `Arc` wrapping for a task-local counter**

`drop_counter` is moved into the dispatcher closure and is never shared outside it — the `Arc` adds a heap allocation and pointer indirection with no benefit. A plain `AtomicU64` suffices:

```suggestion
        let drop_counter = AtomicU64::new(0);
```

How can I resolve this? If you propose a fix, please make it concise.

Reviews (1): Last reviewed commit: "perf(dht,transport): 1000-node scale fix..." | Re-trigger Greptile

Greptile also left 2 inline comments on this PR.

At 1000 nodes two hot paths dominated serialisation in the stack, both
showing up as response-delivery timeouts under load. This commit
combines those two fixes and the deep-review follow-ups that hardened
them against cascade failure and regression.

## perf(dht): atomic last_seen under a read lock

The routing-table touch was called on every inbound DHT message and
always took a write lock, serialising every read behind each update.
Wrap `NodeInfo.last_seen` in an `AtomicInstant` and add a fast path
that atomically bumps the timestamp under a read lock when the address
merge would be a no-op (address absent, already present with the same
type, or skipped by the loopback-injection rule). The write-lock slow
path still runs when a real address merge or re-classification is
needed. (a9a126f)

### Fast path respects addr_type (H4, from deep review)

Initially the read-lock fast path only checked whether the address
was already present, ignoring the caller's `addr_type`. The
relay-promotion call site in the DHT bridge would silently bump
`last_seen` and drop the relay re-classification, breaking the
type-priority ordering that the dialer relies on. Plumbed `addr_type`
through `try_touch_last_seen` and `touch_last_seen_if_merge_noop`;
the fast path now refuses to short-circuit when the existing
classification differs, forcing escalation to the slow path so
`merge_typed_address` can re-order.

## perf(transport): shard inbound message dispatcher by source IP

At 1000 peers the single consumer task draining every inbound message
became the dominant serialisation point — each message took three
async write locks plus an awaited peer-id registration before the next
could even be looked at, causing response delivery to miss the 25 s
caller timeout. Split the receive loop into a light dispatcher that
hashes the source IP and hands off to one of 8 parallel shard
consumers, each running the full parse/register/broadcast pipeline
independently. Hashing by `IpAddr` (not full `SocketAddr`) keeps a
peer pinned to the same shard across ephemeral-port reconnects so
per-peer message ordering is preserved for the common case. (ab3a5bf)

### Dispatcher fault isolation (H2/M2, from deep review)

A single shard-consumer panic caused the inbound dispatcher loop to
break, dropping the remaining seven shard senders and cascading down
all healthy shards. Switched to `try_send` with explicit handling of
both `Full` and `Closed` errors: drops are counted and warn-logged at
intervals (coalesced via `SHARD_DROP_LOG_INTERVAL`), and the
dispatcher only exits when its upstream channel itself closes. The
non-blocking `try_send` also addresses M2, where a blocking
`send().await` on one congested shard could starve the other seven
through the same serialisation point the sharding was meant to
eliminate. The eight shards now provide real fault isolation.

### Shard ordering docstring corrected (M3, from deep review)

`shard_index_for_addr` previously claimed to preserve per-peer
ordering across reconnects. This is true only across ephemeral-port
reconnects from the same source IP. NAT rebinding to a new public
IP, mobile Wi-Fi↔cellular roaming, or dual-stack failover routes the
peer to a different shard, breaking causal ordering at the
application layer. Docstring now says "per source IP" with the caveat
called out explicitly.

### Hold lock across register_connection_peer_id (M5, from deep review)

The shard consumer dropped the `peer_to_channel` write lock before
calling `dual_node.register_connection_peer_id`, leaving a window in
which the app-level map said the peer was known but the transport's
internal addr→peer map did not — a hole-punch lookup during that
window could fail spuriously. Lock is now held across the
registration call and dropped immediately after.

## chore: drop needless struct update in IPDiversityConfig test

Both fields of `IPDiversityConfig` are explicitly set, so the trailing
`..IPDiversityConfig::default()` triggers `clippy::needless_update`
under `cargo clippy --all-targets`. Test code only; no functional
change. (171af29)

## Verification

- `cargo build --lib`: clean
- `cargo fmt --check`: clean
- `cargo clippy --lib -- -D warnings -D clippy::unwrap_used -D clippy::expect_used`: clean
- `cargo clippy --all-targets -- -D warnings`: clean
- `cargo test --lib`: 269 passed, 0 failed

This is one of two PRs that split the original PR #70, which bundled
NAT-traversal hardening with these 1000-node scale fixes. The other
PR covers the NAT-traversal work.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Comment thread src/transport_handle.rs
Comment on lines +1568 to +1590
let mut p2c = peer_to_channel.write().await;
let is_new_peer = !p2c.contains_key(app_id);
let channels = p2c.entry(*app_id).or_default();
let inserted = channels.insert(channel_id.clone());
if inserted {
channel_to_peers
.write()
.await
.entry(channel_id.clone())
.or_default()
.insert(*app_id);
}

if let P2PEvent::Message {
ref topic,
ref data,
..
} = event
&& topic.starts_with("/rr/")
&& let Ok(envelope) =
postcard::from_bytes::<RequestResponseEnvelope>(data)
&& envelope.is_response
{
let mut reqs = active_requests.write().await;
let expected_peer = match reqs.get(&envelope.message_id) {
Some(pending) => pending.expected_peer,
None => {
trace!(
message_id = %envelope.message_id,
"Unmatched /rr/ response (likely timed out) — suppressing"
);
continue;
}
};
// Accept response only if the authenticated app-level
// identity matches. Channel IDs identify connections,
// not peers, so they are not checked here.
if authenticated_node_id.as_ref() != Some(&expected_peer) {
warn!(
// Register peer ID at the low-level transport
// endpoint so PUNCH_ME_NOW relay can find this
// peer by identity instead of socket address.
dual_node_for_peer_reg
.register_connection_peer_id(from_addr, *app_id.to_bytes())
.await;

// Drop the lock before emitting events so subscribers
// that re-enter the registry don't deadlock.
drop(p2c);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Write lock held for every authenticated message, not just new registrations

peer_to_channel write lock (p2c) is acquired and register_connection_peer_id is awaited on the hot path for every authenticated message — including repeat messages from an already-known peer where inserted = false. The consistency window this addresses (app-level map vs. transport's addr→peer map) only exists during the first channel registration. Moving register_connection_peer_id inside the if inserted block holds the lock only during new-channel registrations, eliminating write-lock contention on the repeat-message path:

let mut p2c = peer_to_channel.write().await;
let is_new_peer = !p2c.contains_key(app_id);
let channels = p2c.entry(*app_id).or_default();
let inserted = channels.insert(channel_id.clone());
if inserted {
    channel_to_peers
        .write()
        .await
        .entry(channel_id.clone())
        .or_default()
        .insert(*app_id);
    // Hold p2c here so app-level map and transport's addr→peer map
    // become consistent atomically — window only exists on first insert.
    dual_node_for_peer_reg
        .register_connection_peer_id(from_addr, *app_id.to_bytes())
        .await;
}
drop(p2c);
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/transport_handle.rs
Line: 1568-1590

Comment:
**Write lock held for every authenticated message, not just new registrations**

`peer_to_channel` write lock (`p2c`) is acquired and `register_connection_peer_id` is awaited on the hot path for **every** authenticated message — including repeat messages from an already-known peer where `inserted = false`. The consistency window this addresses (app-level map vs. transport's addr→peer map) only exists during the **first** channel registration. Moving `register_connection_peer_id` inside the `if inserted` block holds the lock only during new-channel registrations, eliminating write-lock contention on the repeat-message path:

```
let mut p2c = peer_to_channel.write().await;
let is_new_peer = !p2c.contains_key(app_id);
let channels = p2c.entry(*app_id).or_default();
let inserted = channels.insert(channel_id.clone());
if inserted {
    channel_to_peers
        .write()
        .await
        .entry(channel_id.clone())
        .or_default()
        .insert(*app_id);
    // Hold p2c here so app-level map and transport's addr→peer map
    // become consistent atomically — window only exists on first insert.
    dual_node_for_peer_reg
        .register_connection_peer_id(from_addr, *app_id.to_bytes())
        .await;
}
drop(p2c);
```

How can I resolve this? If you propose a fix, please make it concise.

Comment thread src/transport_handle.rs
// panicked and its receiver is closed we log and drop, but keep
// routing to the other healthy shards. The dispatcher only exits
// when its upstream channel closes (i.e. transport shutdown).
let drop_counter = Arc::new(AtomicU64::new(0));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Unnecessary Arc wrapping for a task-local counter

drop_counter is moved into the dispatcher closure and is never shared outside it — the Arc adds a heap allocation and pointer indirection with no benefit. A plain AtomicU64 suffices:

Suggested change
let drop_counter = Arc::new(AtomicU64::new(0));
let drop_counter = AtomicU64::new(0);
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/transport_handle.rs
Line: 1467

Comment:
**Unnecessary `Arc` wrapping for a task-local counter**

`drop_counter` is moved into the dispatcher closure and is never shared outside it — the `Arc` adds a heap allocation and pointer indirection with no benefit. A plain `AtomicU64` suffices:

```suggestion
        let drop_counter = AtomicU64::new(0);
```

How can I resolve this? If you propose a fix, please make it concise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant