Skip to content
65 changes: 53 additions & 12 deletions src/js/node/_http_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,24 @@ function ClientRequest(input, options, cb) {
let writeCount = 0;
let resolveNextChunk: ((end: boolean) => void) | undefined = _end => {};

// Node sends headers + first chunk immediately on the first write(). We
// defer by a tick so that `write(chunk); end();` in the same tick still
// takes the non-duplex fast path via send(). If end() hasn't been called by
// then, start the request in duplex mode so the server can respond while
// the body stream stays open (docker-modem relies on this for
// `container.exec` with stdin: true).
function startFetchAfterFirstWriteNT(self) {
if (!fetching && !self.destroyed && !self.finished) {
startFetch();
}
}

const pushChunk = chunk => {
this[kBodyChunks].push(chunk);
if (writeCount > 1) {
startFetch();
} else if (writeCount === 1) {
process.nextTick(startFetchAfterFirstWriteNT, this);
}
resolveNextChunk?.(false);
};
Expand Down Expand Up @@ -192,10 +206,6 @@ function ClientRequest(input, options, cb) {

this.flushHeaders = function () {
if (!fetching) {
this[kAbortController] ??= new AbortController();
this[kAbortController].signal.addEventListener("abort", onAbort, {
once: true,
});
startFetch();
}
};
Expand Down Expand Up @@ -273,6 +283,16 @@ function ClientRequest(input, options, cb) {

fetching = true;

// Every entry point that dispatches the request (send(), flushHeaders(),
// and the write() → pushChunk paths) must have an AbortController wired
// up before the fetch starts so that req.abort()/req.destroy()/timeouts
// and options.signal can cancel the in-flight request. Centralise that
// here so new callers cannot forget it.
if (!this[kAbortController]) {
this[kAbortController] = new AbortController();
this[kAbortController].signal.addEventListener("abort", onAbort, { once: true });
}

const method = this[kMethod];

let keepalive = true;
Expand Down Expand Up @@ -429,10 +449,20 @@ function ClientRequest(input, options, cb) {
res._dump();
}
} finally {
maybeEmitClose();
if (self.finished) {
maybeEmitClose();
} else {
// Request body is still streaming (duplex); emitting
// 'prefinish'/'close' now would fire before 'finish' (or
// with no 'finish' at all). Defer until req.end() runs
// and send() schedules maybeEmitFinish().
deferredRequestClose = true;
}
if (res.statusCode === 304) {
res.complete = true;
maybeEmitClose();
// maybeEmitClose() already ran above (finished) or is
// deferred via deferredRequestClose (duplex) — no need to
// call it again and bypass the self.finished gate.
return;
}
}
Expand All @@ -442,9 +472,10 @@ function ClientRequest(input, options, cb) {
);
};

if (!keepOpen) {
handleResponse();
}
// Emit the response as soon as headers arrive, even when the request
// body is still being streamed (duplex mode). Node.js emits 'response'
// independently of whether req.end() has been called.
handleResponse();
Comment thread
robobun marked this conversation as resolved.

onEnd();
});
Expand Down Expand Up @@ -554,11 +585,21 @@ function ClientRequest(input, options, cb) {

let onEnd = () => {};
let handleResponse: (() => void) | undefined = () => {};
// Set once handleResponse()'s nextTick has run and found the writable side
// still open; send() uses this to emit 'close' in the correct order after
// 'finish' once req.end() is eventually called.
let deferredRequestClose = false;

function emitFinishAndDeferredCloseNT() {
maybeEmitFinish();
Comment thread
robobun marked this conversation as resolved.
if (deferredRequestClose) {
deferredRequestClose = false;
maybeEmitClose();
}
}

const send = () => {
this.finished = true;
this[kAbortController] ??= new AbortController();
this[kAbortController].signal.addEventListener("abort", onAbort, { once: true });

var body = this[kBodyChunks] && this[kBodyChunks].length > 1 ? new Blob(this[kBodyChunks]) : this[kBodyChunks]?.[0];

Expand All @@ -571,7 +612,7 @@ function ClientRequest(input, options, cb) {
if (!!$debug) globalReportError(err);
this.emit("error", err);
} finally {
process.nextTick(maybeEmitFinish.bind(this));
process.nextTick(emitFinishAndDeferredCloseNT);
}
};

Expand Down
189 changes: 189 additions & 0 deletions test/regression/issue/13696.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// https://github.com/oven-sh/bun/issues/13696
// node:http ClientRequest: a single req.write() without req.end() never sent
// the request, and in duplex mode 'response' was held back until req.end().
// docker-modem relies on write-once-keep-open for container.exec stdin, which
// is why testcontainers' default HostPortWaitStrategy hung until timeout.

Comment thread
coderabbitai[bot] marked this conversation as resolved.
import { expect, test } from "bun:test";
import { bunEnv, bunExe, tempDir } from "harness";
import { join } from "node:path";

// Runs a fixture that simulates docker-modem's chunked POST with an open
// request body, against a raw TCP server that responds before the request is
// finished. Prints "recv:<text>" for each response chunk as it arrives, and
// "request-seen" once the server has received the headers.
const fixture = (socketPath: string | undefined) => `
const net = require("net");
const http = require("http");

const socketPath = ${JSON.stringify(socketPath)};

const wait = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

const server = net.createServer((sock) => {
let buf = "";
let responded = false;
sock.on("data", async (d) => {
buf += d.toString("latin1");
if (responded || !buf.includes("\\r\\n\\r\\n")) return;
responded = true;
console.log("request-seen");
// Docker's exec response has no Content-Length and no chunked encoding;
// it just writes raw frames until the connection closes.
sock.write(
"HTTP/1.1 200 OK\\r\\n" +
"Content-Type: application/vnd.docker.raw-stream\\r\\n" +
"\\r\\n",
);
for (let i = 0; i < 3; i++) {
sock.write("chunk-" + i + "\\n");
await wait(50);
}
sock.end();
});
});

const listenArgs = socketPath ? [socketPath] : [0, "127.0.0.1"];

server.listen(...listenArgs, () => {
const requestOpts = socketPath
? { socketPath, path: "/exec/abc/start" }
: { host: "127.0.0.1", port: server.address().port, path: "/exec/abc/start" };

// docker-modem passes an empty callback here and attaches 'response'
// separately via req.on('response', ...).
const req = http.request(
{
...requestOpts,
method: "POST",
headers: {
"Content-Type": "application/json",
"Transfer-Encoding": "chunked",
},
},
function () {},
);

req.on("response", (res) => {
console.log("response-status:" + res.statusCode);
res.setEncoding("utf8");
res.on("data", (chunk) => {
for (const line of chunk.split("\\n")) {
if (line) console.log("recv:" + line);
}
});
res.on("end", () => {
console.log("response-end");
server.close();
// The request body stream is still open; end it now so the process
// can exit cleanly.
req.end();
});
});

req.on("error", (err) => {
console.error("request-error:" + err.message);
process.exit(1);
});

// Single write, no req.end(). docker-modem does exactly this for
// openStdin: true.
req.write(JSON.stringify({ Detach: false, Tty: true }));
});

// Exit with whatever we've collected so far if the response never arrives,
// so the parent test gets a clean assertion failure instead of a timeout.
setTimeout(() => process.exit(0), 3000).unref();
`;

for (const socketMode of ["tcp", "unix"] as const) {
test(`http.request delivers response while request body stream is still open (${socketMode})`, async () => {
using dir = tempDir("issue-13696", {});
const socketPath = socketMode === "unix" ? join(String(dir), "docker.sock") : undefined;

await using proc = Bun.spawn({
cmd: [bunExe(), "-e", fixture(socketPath)],
env: bunEnv,
stdout: "pipe",
stderr: "pipe",
});

const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]);

const lines = stdout.trim().split("\n");
Comment thread
coderabbitai[bot] marked this conversation as resolved.
// The server must have received the request (a single write() dispatches
// it), the response must be emitted with status 200, every body chunk must
// be delivered, and the response must end cleanly.
expect({ lines, stderr }).toEqual({
lines: ["request-seen", "response-status:200", "recv:chunk-0", "recv:chunk-1", "recv:chunk-2", "response-end"],
stderr: expect.not.stringContaining("request-error"),
});
expect(exitCode).toBe(0);
});
}

// Also cover the case where flushHeaders() is called explicitly (which already
// started the fetch in duplex mode) but the response was still being held back
// until req.end().
test("http.request emits 'response' in duplex mode after flushHeaders() without end()", async () => {
const src = `
const net = require("net");
const http = require("http");

const server = net.createServer((sock) => {
let buf = "";
let responded = false;
sock.on("data", (d) => {
buf += d.toString("latin1");
if (responded || !buf.includes("\\r\\n\\r\\n")) return;
responded = true;
sock.write("HTTP/1.1 200 OK\\r\\nContent-Type: text/plain\\r\\n\\r\\nhello");
sock.end();
});
});

server.listen(0, "127.0.0.1", () => {
const { port } = server.address();
const req = http.request({
host: "127.0.0.1",
port,
path: "/",
method: "POST",
headers: { "Transfer-Encoding": "chunked" },
});
req.on("response", (res) => {
let body = "";
res.setEncoding("utf8");
res.on("data", (c) => (body += c));
res.on("end", () => {
console.log("body:" + body);
server.close();
req.end();
});
});
req.on("error", (err) => {
console.error("request-error:" + err.message);
process.exit(1);
});
req.flushHeaders();
req.write("payload");
});

setTimeout(() => process.exit(0), 3000).unref();
`;

await using proc = Bun.spawn({
cmd: [bunExe(), "-e", src],
env: bunEnv,
stdout: "pipe",
stderr: "pipe",
});

const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]);

expect({ stdout: stdout.trim(), stderr }).toEqual({
stdout: "body:hello",
stderr: expect.not.stringContaining("request-error"),
});
expect(exitCode).toBe(0);
});
Loading