fix(ws): process pending handshakes concurrently#2359
Conversation
There was a problem hiding this comment.
Pull request overview
Updates WsTransport’s inbound accept path to avoid head-of-line blocking from slow/malformed HTTP upgrade requests by splitting raw stream acceptance from WebSocket handshake parsing and bounding handshake concurrency.
Changes:
- Refactors WebSocket accept flow to use an accept dispatcher + bounded handshake workers and a bounded queue of successfully-upgraded connections.
- Renames the public constructor argument from
handshakeTimeouttoheadersTimeoutand addsconcurrentAcceptsto bound in-flight handshake work. - Adds a regression test for “slow headers don’t block valid accepts” and adjusts stream transport tests to tolerate nondeterministic accept order.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
libp2p/transports/wstransport.nim |
Implements concurrent accept/handshake pipeline and new configuration knobs (headersTimeout, concurrentAccepts). |
tests/libp2p/transports/test_ws.nim |
Adds regression test to ensure slow/incomplete headers don’t block a valid accept. |
tests/libp2p/transports/stream_tests.nim |
Updates connection-parallelism test to be robust to nondeterministic accept order in concurrent transports. |
78cb956 to
2865b68
Compare
| proc(): Future[Connection] {.async: (raises: [CatchableError]).} = | ||
| let req = await readHttpRequest(stream, server.headersTimeout) | ||
| let wstransp = await self.wsserver.handleRequest(req) | ||
| return await self.connHandler(wstransp, server.secure, Direction.In) | ||
| )() | ||
| .wait(self.headersTimeout) |
There was a problem hiding this comment.
This proc is only here so you can wait(self.headersTimeout)? If so, maybe create a template so you can
withTimeout(self.headersTimeout):
your_code2865b68 to
0da86f0
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #2359 +/- ##
==========================================
+ Coverage 73.77% 73.81% +0.03%
==========================================
Files 150 150
Lines 19810 19910 +100
Branches 19 19
==========================================
+ Hits 14615 14696 +81
- Misses 5195 5214 +19
🚀 New features to boost your workflow:
|
19f5121 to
0da86f0
Compare
| slow.close() | ||
| slowClosed = true | ||
|
|
There was a problem hiding this comment.
slow.close() is used after the accept/dial succeeds, but it does not wait for the underlying transport to actually close. Since this test is asserting timing behavior and then immediately tears down connections/transports, prefer await slow.closeWait() here (with appropriate error handling) to reduce flakiness from lingering sockets during teardown.
| if not accepted: | ||
| await closeHttpStream(stream) |
There was a problem hiding this comment.
does it make sense to do this in deffer? would avoid same pattern in CancelledError as well
| finally: | ||
| for fut in acceptFuts: | ||
| if not fut.finished: | ||
| await noCancel fut.cancelAndWait() | ||
| elif fut.completed(): | ||
| try: | ||
| await closeHttpStream(fut.read()) | ||
| except CatchableError as exc: | ||
| trace "Error reading completed WS accept stream", description = exc.msg |
There was a problem hiding this comment.
could this be done in defer? it would avoid this gigantic try block and indentation
| self.connections[Direction.In].mapIt(it.close()) & | ||
| self.connections[Direction.Out].mapIt(it.close()) |
There was a problem hiding this comment.
self.connections.values().mapIt(it.close()) ?
0da86f0 to
262056f
Compare
262056f to
9cc67a9
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
libp2p/transports/wstransport.nim:389
stop()enqueues thenilsentinel vianotifyAcceptClosed()before cancellingacceptLoop/handshakeFuts. A handshake worker can still successfully enqueue a realConnectionafter this sentinel, causingaccept()to popniland raiseTransportClosedErroreven though there are valid connections still queued behind it (leaving them unaccepted/leaked).
Consider only signalling closure after all handshake workers are stopped (or preventing workers from enqueueing once shutdown begins), so the sentinel cannot be inserted ahead of real connections.
self.running = false # mark stopped as soon as possible
self.notifyAcceptClosed()
try:
trace "Stopping WS transport"
await procCall Transport(self).stop() # call base
var toWait: seq[Future[void]]
if not isNil(self.acceptLoop) and not self.acceptLoop.finished:
toWait.add(self.acceptLoop.cancelAndWait())
for fut in self.handshakeFuts:
if not fut.finished:
toWait.add(fut.cancelAndWait())
9cc67a9 to
f5e4d60
Compare
3ab31d6 to
6467d4f
Compare
|
|
||
| const | ||
| DefaultHeadersTimeout = 3.seconds | ||
| DefaultConcurrentAccepts = 200 |
Summary
This PR fixes WebSocket accept head-of-line blocking caused by slow or malformed HTTP upgrade requests.
WsTransportnow accepts raw HTTP streams separately from WebSocket handshake parsing. A dispatcher ownsHttpServer.acceptStream(), while bounded handshake workers parse headers withreadHttpRequest()and pass valid requests toWSServer.handleRequest(). Successfully upgraded connections are queued forWsTransport.accept().This prevents one incomplete WebSocket handshake from blocking later valid WebSocket connections until the header timeout expires.
Requires:
Should fix:
Affected Areas
WebSocket transport accept/handshake path, dependency bump to
nim-websockPR 193. Do not merge until thenCompatibility & Downstream Validation
Reference PRs / branches / commits demonstrating successful integration:
Nimbus:
N/A
Waku:
N/A
Codex:
N/A
Impact on Library Users
This changes the public
WsTransport.newtimeout parameter fromhandshakeTimeouttoheadersTimeout, with the former being deprecated in nim-websocks.Behaviorally, malformed or slow WebSocket handshakes are now handled inside the transport and should no longer cause the switch accept loop to stop or block valid WebSocket accepts behind header timeouts.
The WebSocket transport also gains a configurable
concurrentAcceptslimit, defaulting to200, to bound concurrent handshake work and accepted-connection queueing.Risk Assessment
Risk is moderate because this changes the WebSocket accept path and task lifecycle.
Main risks:
handshakeTimeoutmust rename it toheadersTimeout.Mitigations:
References
nim-websockcommit:1c12189667ff5586cb81a2e43fd4a19190b7060dAdditional Notes
Verified locally with: