-
Notifications
You must be signed in to change notification settings - Fork 4.7k
fix(net): flush pending write callback on socket close to prevent stalled streams #27161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -148,6 +148,11 @@ const SocketHandlers: SocketHandler = { | |
| if (!self || self[kclosed]) return; | ||
| self[kclosed] = true; | ||
| //socket cannot be used after close | ||
| const callback = self[kwriteCallback]; | ||
| if (callback) { | ||
| self[kwriteCallback] = null; | ||
| callback(err || $ERR_SOCKET_CLOSED()); | ||
| } | ||
| detachSocket(self); | ||
| SocketEmitEndNT(self, err); | ||
| self.data = null; | ||
|
|
@@ -314,6 +319,11 @@ const ServerHandlers: SocketHandler<NetSocket> = { | |
| if (!data[kclosed]) { | ||
| data[kclosed] = true; | ||
| //socket cannot be used after close | ||
| const callback = data[kwriteCallback]; | ||
| if (callback) { | ||
| data[kwriteCallback] = null; | ||
| callback(err || $ERR_SOCKET_CLOSED()); | ||
| } | ||
| detachSocket(data); | ||
| SocketEmitEndNT(data, err); | ||
| data.data = null; | ||
|
|
@@ -540,7 +550,11 @@ const SocketHandlers2: SocketHandler<NonNullable<import("node:net").Socket["_han | |
| if (err) $debug(err); | ||
| if (self[kclosed]) return; | ||
| self[kclosed] = true; | ||
| // TODO: should we be doing something with err? | ||
| const callback = self[kwriteCallback]; | ||
| if (callback) { | ||
| self[kwriteCallback] = null; | ||
| callback(err || $ERR_SOCKET_CLOSED()); | ||
| } | ||
|
Comment on lines
+553
to
+557
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prevent double write-callback invocation on pre-connect close. Line 553-557 now invokes 🔧 Proposed fix const callback = self[kwriteCallback];
if (callback) {
self[kwriteCallback] = null;
- callback(err || $ERR_SOCKET_CLOSED());
+ // For pre-connect writes, _write() already installs once("close", onClose)
+ // that invokes the same callback.
+ if (!self.connecting) {
+ callback(err || $ERR_SOCKET_CLOSED());
+ }
}Also applies to: 1452-1459 🤖 Prompt for AI Agents
Comment on lines
+553
to
+557
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 The new Extended reasoning...What the bug isThe PR adds The two independent callback pathsIn
The Step-by-step proof of double invocation
Why existing safeguards do not prevent thisNulling ImpactIn Node.js/Bun stream internals, calling the const socket = net.connect({ port: 12345, host: "127.0.0.1" });
socket.write("hello"); // write while connecting
socket.destroy(); // → callback invoked twice → ERR_MULTIPLE_CALLBACKFixGuard the function onClose() {
if (self[kwriteCallback] === callback) {
self[kwriteCallback] = null;
callback($ERR_SOCKET_CLOSED_BEFORE_CONNECTION());
}
}Since |
||
| self[kended] = true; | ||
| if (!self.allowHalfOpen) self.write = writeAfterFIN; | ||
| self.push(null); | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,138 @@ | ||||||||||||||
| import { expect, test } from "bun:test"; | ||||||||||||||
| import { bunEnv, bunExe, tempDir } from "harness"; | ||||||||||||||
|
|
||||||||||||||
| // https://github.com/oven-sh/bun/issues/24808 | ||||||||||||||
| // When multiple server-side sockets have their remote clients disconnect while the server | ||||||||||||||
| // is actively writing data, ALL sockets should eventually emit the `close` event. | ||||||||||||||
| // Previously, only one socket would complete the full event lifecycle while the rest | ||||||||||||||
| // would stall after `end`, never emitting `close`. | ||||||||||||||
| test("all server sockets emit close when clients disconnect during active writes", async () => { | ||||||||||||||
| const NUM_CLIENTS = 4; | ||||||||||||||
|
|
||||||||||||||
| using dir = tempDir("24808", { | ||||||||||||||
| "server.js": ` | ||||||||||||||
| const net = require('net'); | ||||||||||||||
| const buffer = Buffer.allocUnsafeSlow(1024 * 128); | ||||||||||||||
| const NUM = ${NUM_CLIENTS}; | ||||||||||||||
| let socketId = 0; | ||||||||||||||
| const closedSet = new Set(); | ||||||||||||||
|
|
||||||||||||||
| const server = net.createServer((c) => { | ||||||||||||||
| const id = ++socketId; | ||||||||||||||
| let canwrite = true; | ||||||||||||||
|
|
||||||||||||||
| function write() { | ||||||||||||||
| if (!c.writable) return; | ||||||||||||||
| if (c.writableLength > 1024 * 1024 || !canwrite) return; | ||||||||||||||
| canwrite = c.write(buffer, (err) => { | ||||||||||||||
| if (err) return; | ||||||||||||||
| canwrite = true; | ||||||||||||||
| // Recursively write to keep the writable stream busy | ||||||||||||||
| write(); | ||||||||||||||
| }); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| write(); | ||||||||||||||
| const tt = setInterval(write, 1); | ||||||||||||||
| c.on("drain", write); | ||||||||||||||
|
|
||||||||||||||
| c.on("end", () => { clearInterval(tt); }); | ||||||||||||||
| c.on("error", () => { clearInterval(tt); }); | ||||||||||||||
| c.on("close", () => { | ||||||||||||||
| closedSet.add(id); | ||||||||||||||
| c.removeAllListeners("drain"); | ||||||||||||||
| clearInterval(tt); | ||||||||||||||
| if (closedSet.size === NUM) { | ||||||||||||||
| console.log("CLOSED:" + JSON.stringify([...closedSet].sort())); | ||||||||||||||
| clearTimeout(failTimer); | ||||||||||||||
| server.close(); | ||||||||||||||
| } | ||||||||||||||
| }); | ||||||||||||||
| }); | ||||||||||||||
|
|
||||||||||||||
| const failTimer = setTimeout(() => { | ||||||||||||||
| console.log("TIMEOUT"); | ||||||||||||||
| console.log("CLOSED:" + JSON.stringify([...closedSet].sort())); | ||||||||||||||
| process.exit(1); | ||||||||||||||
| }, 10000); | ||||||||||||||
|
|
||||||||||||||
| server.listen(0, () => { | ||||||||||||||
| console.log("PORT:" + server.address().port); | ||||||||||||||
| }); | ||||||||||||||
| `, | ||||||||||||||
| }); | ||||||||||||||
|
|
||||||||||||||
| await using serverProc = Bun.spawn({ | ||||||||||||||
| cmd: [bunExe(), "server.js"], | ||||||||||||||
| cwd: String(dir), | ||||||||||||||
| env: bunEnv, | ||||||||||||||
| stdout: "pipe", | ||||||||||||||
| stderr: "pipe", | ||||||||||||||
| }); | ||||||||||||||
|
|
||||||||||||||
| // Read port from server stdout | ||||||||||||||
| const reader = serverProc.stdout.getReader(); | ||||||||||||||
| let portLine = ""; | ||||||||||||||
| while (true) { | ||||||||||||||
| const { value, done } = await reader.read(); | ||||||||||||||
| if (done) break; | ||||||||||||||
| portLine += new TextDecoder().decode(value); | ||||||||||||||
| if (portLine.includes("\n")) break; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| const portMatch = portLine.match(/PORT:(\d+)/); | ||||||||||||||
| expect(portMatch).not.toBeNull(); | ||||||||||||||
| const port = parseInt(portMatch![1]); | ||||||||||||||
|
|
||||||||||||||
| // Create clients that connect, don't read data (to build up server backpressure), then disconnect | ||||||||||||||
| await using clientProc = Bun.spawn({ | ||||||||||||||
| cmd: [ | ||||||||||||||
| bunExe(), | ||||||||||||||
| "-e", | ||||||||||||||
| ` | ||||||||||||||
| const net = require('net'); | ||||||||||||||
| const sockets = []; | ||||||||||||||
| for (let i = 0; i < ${NUM_CLIENTS}; i++) { | ||||||||||||||
| const s = net.connect({ port: ${port}, host: '127.0.0.1' }); | ||||||||||||||
| s.on('error', () => {}); | ||||||||||||||
| sockets.push(s); | ||||||||||||||
| } | ||||||||||||||
| setTimeout(() => { | ||||||||||||||
| sockets.forEach(s => s.destroy()); | ||||||||||||||
| setTimeout(() => process.exit(0), 500); | ||||||||||||||
| }, 2000); | ||||||||||||||
| `, | ||||||||||||||
| ], | ||||||||||||||
| env: bunEnv, | ||||||||||||||
| stdout: "pipe", | ||||||||||||||
| stderr: "pipe", | ||||||||||||||
| }); | ||||||||||||||
|
|
||||||||||||||
| // Wait for server to finish | ||||||||||||||
| let remaining = ""; | ||||||||||||||
| while (true) { | ||||||||||||||
| const { value, done } = await reader.read(); | ||||||||||||||
| if (done) break; | ||||||||||||||
| remaining += new TextDecoder().decode(value); | ||||||||||||||
| } | ||||||||||||||
| reader.releaseLock(); | ||||||||||||||
|
|
||||||||||||||
| const fullOutput = portLine + remaining; | ||||||||||||||
|
|
||||||||||||||
| const [stdout, stderr, exitCode] = await Promise.all([ | ||||||||||||||
| Promise.resolve(fullOutput), | ||||||||||||||
| serverProc.stderr.text(), | ||||||||||||||
| serverProc.exited, | ||||||||||||||
| ]); | ||||||||||||||
|
Comment on lines
+122
to
+126
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial Minor: Unused variables from Promise.all. The 🔧 Suggested simplification- const [stdout, stderr, exitCode] = await Promise.all([
- Promise.resolve(fullOutput),
- serverProc.stderr.text(),
- serverProc.exited,
- ]);
+ const exitCode = await serverProc.exited;📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||
|
|
||||||||||||||
| await clientProc.exited; | ||||||||||||||
|
|
||||||||||||||
|
Comment on lines
+128
to
+129
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Assert the client process exit code as well. Line 128 awaits 🔧 Proposed fix- await clientProc.exited;
+ const clientExitCode = await clientProc.exited;
+ expect(clientExitCode).toBe(0);As per coding guidelines: "Always check exit codes and test error scenarios when spawning processes in tests." 🤖 Prompt for AI Agents |
||||||||||||||
| const closedMatch = fullOutput.match(/CLOSED:(\[.*?\])/); | ||||||||||||||
| expect(closedMatch).not.toBeNull(); | ||||||||||||||
|
|
||||||||||||||
| const closed = JSON.parse(closedMatch![1]); | ||||||||||||||
|
|
||||||||||||||
| // All sockets should have received the close event | ||||||||||||||
| expect(closed).toEqual([1, 2, 3, 4]); | ||||||||||||||
| expect(exitCode).toBe(0); | ||||||||||||||
| }, 15000); | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 The test sets an explicit 15000ms timeout as the third argument to Extended reasoning...What the issue isThe new regression test at test("all server sockets emit close when clients disconnect during active writes", async () => {
// ... test body ...
}, 15000);This sets an explicit 15000ms (15 second) timeout for the test. Why this violates project conventionsThe
Bun's test runner already provides built-in timeout handling, so explicit timeouts on individual tests are unnecessary and violate the project's established conventions. Why the timeout was likely addedThe test spawns a server subprocess that does aggressive writes to 4 sockets, waits for clients to disconnect, and verifies all sockets emit ImpactThis is a minor convention violation with no functional impact. The test works correctly with or without the explicit timeout. However, maintaining consistent adherence to project conventions keeps the test suite uniform and prevents proliferation of ad-hoc timeouts. How to fixRemove the }, 15000);to: }); |
||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 Incomplete fix:
SocketHandlers2.close(line 547) is missing the samekwriteCallbackflush that this PR adds toSocketHandlers.closeandServerHandlers.close.SocketHandlers2is the default handler for all client-initiated sockets (assigned at line 708:this[khandlers] = SocketHandlers2), so client sockets that close while a write is in-flight will stall in the same way. Its ownerrorhandler (line 602-606) already flusheskwriteCallback, confirming the pattern is needed here too.Extended reasoning...
What the bug is
This PR fixes the issue where pending
kwriteCallbackis never invoked when a socket closes, causing the writable stream to stall. The fix correctly adds callback flushing toSocketHandlers.close(line 151-155) andServerHandlers.close(line 322-326), but the same fix is missing fromSocketHandlers2.close(line 547-558).Why SocketHandlers2 matters
SocketHandlers2is the default handler for all client-initiated sockets. In theSocketconstructor at line 708, every new socket getsthis[khandlers] = SocketHandlers2. This means any socket created vianet.connect(),net.createConnection(), ornew net.Socket().connect()usesSocketHandlers2— making this a heavily-used code path.The inconsistency within SocketHandlers2 itself
SocketHandlers2.error(lines 595-608) already flushes the pending write callback:But
SocketHandlers2.close(lines 547-558) does not. It only setskended = true, pushes null, and callsread(0). The existing TODO comment at line 553 (// TODO: should we be doing something with err?) even hints this was a known gap.Step-by-step proof of the stall
net.connect()→ constructor assignsthis[khandlers] = SocketHandlers2socket.write(data, callback)→_write()setsself[kwriteCallback] = callbackand waits for native drainSocketHandlers2.closefires → setskclosed, pushes null to readable, but never invokeskwriteCallback_finalis never called →finishandcloseevents are never emittederrorinstead ofclose, the callback IS flushed (line 602-606) and the stream shuts down properly — but a clean close bypasses the error handlerImpact
Any client-side
net.Socketthat has a pending write when the remote end closes cleanly will have its writable stream stall indefinitely. This is the exact same class of bug the PR describes and fixes for server-side sockets.How to fix
Add the same pattern to
SocketHandlers2.close, before the existing logic:This makes all three close handlers consistent and eliminates the stall for client sockets as well.