From dc863642de5707adae0fdf7faa07ae15c4f122db Mon Sep 17 00:00:00 2001 From: robobun <117481402+robobun@users.noreply.github.com> Date: Thu, 4 Jun 2026 09:11:55 +0000 Subject: [PATCH 1/2] http: support custom socket-producing agents (tunnel) in node:http client MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit node:http / node:https dispatched every request through native fetch and never invoked a user agent's addRequest/createSocket/onSocket hooks. Agents like the `tunnel` package — which establish an HTTP CONNECT tunnel by overriding addRequest and handing back the tunneled socket via req.onSocket() — were silently ignored, so Bun connected directly to the target and bypassed the proxy entirely. Detect when an agent overrides addRequest (and exposes no `.proxy` href, so https-proxy-agent / http-proxy-agent keep using the native fetch proxy path) and drive the request over the socket the agent produces, speaking HTTP/1.x with the existing HTTPParser the way node:http does (tickOnSocket / socketOnData / parserOnIncomingClient). Adds ClientRequest.onSocket() and IncomingMessage._addHeaderLines/_addHeaderLine for the socket-driven path. Builds on the CONNECT-method support from #31574, which the tunnel package's inner CONNECT request relies on. --- src/js/internal/http.ts | 4 + src/js/node/_http_client.ts | 739 +++++++++++++++++++- src/js/node/_http_incoming.ts | 70 +- test/bun.lock | 3 + test/js/node/http/node-http-connect.test.ts | 431 +++++++++++- test/js/node/http/node-http.test.ts | 96 ++- test/package.json | 1 + 7 files changed, 1337 insertions(+), 7 deletions(-) diff --git a/src/js/internal/http.ts b/src/js/internal/http.ts index ecd92d4c4f0..e66316a42d8 100644 --- a/src/js/internal/http.ts +++ b/src/js/internal/http.ts @@ -57,6 +57,9 @@ const kSignal = Symbol("signal"); const kMaxHeaderSize = Symbol("maxHeaderSize"); const abortedSymbol = Symbol("aborted"); const kClearTimeout = Symbol("kClearTimeout"); +// Set when a ClientRequest is driven over a socket produced by a custom agent +// (e.g. the `tunnel` package) instead of the native fetch fast path. +const kCustomSocketPath = Symbol("customSocketPath"); const headerStateSymbol = Symbol("headerState"); // used for pretending to emit events in the right order @@ -514,6 +517,7 @@ export { kBodyChunks, kClearTimeout, kCloseCallback, + kCustomSocketPath, kDeferredTimeouts, kDeprecatedReplySymbol, kEmitState, diff --git a/src/js/node/_http_client.ts b/src/js/node/_http_client.ts index eae63b0a56c..c514444e9b9 100644 --- a/src/js/node/_http_client.ts +++ b/src/js/node/_http_client.ts @@ -45,6 +45,8 @@ const { ClientRequestEmitState, kSignal, kEmptyObject, + kCustomSocketPath, + kHandle, getIsNextIncomingMessageHTTPS, setIsNextIncomingMessageHTTPS, typeSymbol, @@ -54,14 +56,46 @@ const { emitCloseNTAndComplete, } = require("internal/http"); -const { globalAgent } = require("node:_http_agent"); +const { globalAgent, Agent } = require("node:_http_agent"); const { IncomingMessage } = require("node:_http_incoming"); const { OutgoingMessage } = require("node:_http_outgoing"); +const { parsers, freeParser, prepareError, HTTPParser, isLenient } = require("node:_http_common"); +const { kLenientAll, kLenientNone } = HTTPParser; +const { getLazy } = require("internal/shared"); +const net = getLazy(() => require("node:net")); +const tls = getLazy(() => require("node:tls")); +const { getMaxHTTPHeaderSize, statusCodeSymbol, statusMessageSymbol, noBodySymbol } = require("internal/http"); + const globalReportError = globalThis.reportError; const setTimeout = globalThis.setTimeout; const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/; const INVALID_HOST_CHAR_REGEX = /[/\\?#@\t\n\r]/; +const CONNECT_STATUS_LINE_REGEX = /^HTTP\/(\d)\.(\d) (\d{3})(?: (.*))?$/; +const kEmptyBuffer = Buffer.alloc(0); +// Headers Node's IncomingMessage._addHeaderLine treats as singletons: the first +// occurrence wins and later duplicates are discarded (set-cookie is handled +// separately as an array). Used when folding parsed CONNECT response headers. +const kConnectSingletonHeaders = new Set([ + "age", + "authorization", + "content-length", + "content-type", + "etag", + "expires", + "from", + "host", + "if-modified-since", + "if-unmodified-since", + "last-modified", + "location", + "max-forwards", + "proxy-authorization", + "referer", + "retry-after", + "server", + "user-agent", +]); const { URL } = globalThis; @@ -77,6 +111,323 @@ function emitErrorEventNT(self, err) { } } +function emitErrorEvent(request, error) { + request.emit("error", error); +} + +// --- Socket-driven client path (custom agents) --- +// When a request is dispatched through a custom agent that produces its own +// socket (e.g. the `tunnel` package, which CONNECT-tunnels through a proxy and +// hands back the tunneled socket via req.onSocket()), Bun speaks HTTP/1.x over +// that socket using the same HTTPParser machinery Node uses, instead of the +// native fetch fast path. Mirrors node:http's tickOnSocket/socketOnData/ +// parserOnIncomingClient flow. + +function statusIsInformational(status) { + // 1xx except 101 (Switching Protocols). + return status < 200 && status >= 100 && status !== 101; +} + +function parserOnIncomingClient(this: any, res, shouldKeepAlive) { + const socket = this.socket; + const req = socket._httpMessage; + + if (req.res) { + // Double response from the server. + socket.destroy(); + return 0; + } + req.res = res; + + if (res.upgrade) return 2; + + const method = req.method; + if (method === "CONNECT") { + res.upgrade = true; + return 2; + } + + if (statusIsInformational(res.statusCode)) { + req.res = null; + if (res.statusCode === 100) { + req.emit("continue"); + } + req.emit("information", { + statusCode: res.statusCode, + statusMessage: res.statusMessage, + httpVersion: res.httpVersion, + httpVersionMajor: res.httpVersionMajor, + httpVersionMinor: res.httpVersionMinor, + headers: res.headers, + rawHeaders: res.rawHeaders, + }); + return 1; + } + + if (req.shouldKeepAlive && !shouldKeepAlive && !req[kUpgradeOrConnect]) { + req.shouldKeepAlive = false; + } + + res.req = req; + res.on("end", responseOnEnd); + + // If the user did not listen for 'response', dump the body so the socket + // doesn't hang in a paused state. + if (req.aborted || !req.emit("response", res)) res._dump(); + + if (method === "HEAD") return 1; + if (res.statusCode === 304) { + res.complete = true; + return 1; + } + + return 0; +} + +function responseOnEnd(this: any) { + const req = this.req; + const socket = req.socket; + + req._ended = true; + + if (!req.shouldKeepAlive) { + if (socket && socket.writable) { + if (typeof socket.destroySoon === "function") socket.destroySoon(); + else socket.end(); + } + } +} + +function socketOnData(this: any, d) { + const socket = this; + const req = this._httpMessage; + const parser = this.parser; + + if (!parser) return; + + const ret = parser.execute(d); + if (ret instanceof Error) { + prepareError(ret, parser, d); + freeParser(parser, req, socket); + socket.removeListener("data", socketOnData); + socket.removeListener("end", socketOnEnd); + socket.destroy(); + req.socket._hadError = true; + emitErrorEvent(req, ret); + } else if (parser.incoming?.upgrade) { + // Upgrade (101) or CONNECT. + const bytesParsed = ret; + const res = parser.incoming; + req.res = res; + + socket.removeListener("data", socketOnData); + socket.removeListener("end", socketOnEnd); + + parser.finish(); + freeParser(parser, req, socket); + + const bodyHead = d.slice(bytesParsed, d.length); + + const eventName = req.method === "CONNECT" ? "connect" : "upgrade"; + if (req.listenerCount(eventName) > 0) { + req[kUpgradeOrConnect] = true; + + socket.emit("agentRemove"); + socket.removeListener("close", socketCloseListener); + socket.removeListener("error", socketErrorListener); + + socket._httpMessage = null; + socket.readableFlowing = null; + + req.emit(eventName, res, socket, bodyHead); + req.destroyed = true; + req._closed = true; + req.emit("close"); + } else { + socket.destroy(); + } + } else if (parser.incoming?.complete && !statusIsInformational(parser.incoming.statusCode)) { + socket.removeListener("data", socketOnData); + socket.removeListener("end", socketOnEnd); + freeParser(parser, req, socket); + } +} + +function socketOnEnd(this: any) { + const socket = this; + const req = this._httpMessage; + const parser = this.parser; + + if (!req.res && !req.socket._hadError) { + req.socket._hadError = true; + emitErrorEvent(req, new ConnResetException("socket hang up")); + } + if (parser) { + parser.finish(); + freeParser(parser, req, socket); + } + socket.destroy(); +} + +function socketErrorListener(this: any, err) { + const socket = this; + const req = socket._httpMessage; + + if (req) { + req.socket._hadError = true; + emitErrorEvent(req, err); + } + + const parser = socket.parser; + if (parser) { + parser.finish(); + freeParser(parser, req, socket); + } + + socket.removeListener("data", socketOnData); + socket.removeListener("end", socketOnEnd); + socket.destroy(); +} + +function socketCloseListener(this: any) { + const socket = this; + const req = socket._httpMessage; + + const parser = socket.parser; + const res = req.res; + + req.destroyed = true; + if (res) { + if (!res.complete) { + res.destroy(new ConnResetException("aborted")); + } + req._closed = true; + req.emit("close"); + if (!res.aborted && res.readable) { + res.push(null); + } + } else { + if (!req.socket._hadError) { + req.socket._hadError = true; + emitErrorEvent(req, new ConnResetException("socket hang up")); + } + req._closed = true; + req.emit("close"); + } + + if (parser) { + parser.finish(); + freeParser(parser, req, socket); + } +} + +function tickOnSocket(req, socket) { + const parser = parsers.alloc(); + req.socket = socket; + const lenient = req.insecureHTTPParser === undefined ? isLenient() : req.insecureHTTPParser; + parser.initialize(HTTPParser.RESPONSE, {}, req.maxHeaderSize || 0, lenient ? kLenientAll : kLenientNone, null); + parser.socket = socket; + parser.outgoing = req; + req[kParser] = parser; + + socket.parser = parser; + socket._httpMessage = req; + + if (typeof req[kMaxHeadersCount] === "number") { + parser.maxHeaderPairs = req[kMaxHeadersCount] << 1; + } + + parser.joinDuplicateHeaders = req.joinDuplicateHeaders; + parser.onIncoming = parserOnIncomingClient; + socket.on("error", socketErrorListener); + socket.on("data", socketOnData); + socket.on("end", socketOnEnd); + socket.on("close", socketCloseListener); + + req.emit("socket", socket); +} + +const CRLF = "\r\n"; + +// Serialize the request line + headers into the latin1 string written to a +// custom agent's socket. Mirrors the subset of OutgoingMessage._storeHeader the +// socket-driven path needs. +function buildRequestHead(req) { + let head = `${req[kMethod]} ${req[kPath]} HTTP/1.1${CRLF}`; + + const headers = req.getHeaders(); + let hasHost = false; + let hasConnection = false; + let hasContentLength = false; + let hasTransferEncoding = false; + + for (const key in headers) { + const lowerKey = key.toLowerCase(); + if (lowerKey === "host") hasHost = true; + else if (lowerKey === "connection") hasConnection = true; + else if (lowerKey === "content-length") hasContentLength = true; + else if (lowerKey === "transfer-encoding") hasTransferEncoding = true; + + const value = headers[key]; + if ($isJSArray(value)) { + for (let i = 0; i < value.length; i++) head += `${key}: ${value[i]}${CRLF}`; + } else { + head += `${key}: ${value}${CRLF}`; + } + } + + if (!hasHost) { + const host = req[kHost]; + head += `Host: ${req[kUseDefaultPort] ? host : `${host}:${req[kPort]}`}${CRLF}`; + } + if (!hasConnection) { + head += `Connection: ${req.shouldKeepAlive ? "keep-alive" : "close"}${CRLF}`; + } + + if (!hasContentLength && !hasTransferEncoding) { + const chunks = req[kBodyChunks]; + let bodyLength = 0; + if (chunks) for (let i = 0; i < chunks.length; i++) bodyLength += chunks[i].length; + if (bodyLength > 0) head += `Content-Length: ${bodyLength}${CRLF}`; + } + + head += CRLF; + return head; +} + +function emitFinishNT(req) { + if (req.destroyed) return; + if (!(req[kEmitState] & (1 << ClientRequestEmitState.finish))) { + req[kEmitState] |= 1 << ClientRequestEmitState.finish; + req.emit("prefinish"); + req.emit("finish"); + } +} + +function onSocketNT(req, socket, err) { + if (req.destroyed || err) { + req.destroyed = true; + if (socket) { + if (!err && req[kAgent] && !socket.destroyed) { + socket.emit("free"); + return; + } + socket.destroy(err); + } + if (!req.aborted && !err) { + err = new ConnResetException("socket hang up"); + } + if (err) { + emitErrorEvent(req, err); + } + req._closed = true; + req.emit("close"); + } else { + tickOnSocket(req, socket); + req._flushToSocket(); + } +} + function ClientRequest(input, options, cb) { if (!(this instanceof ClientRequest)) { return new (ClientRequest as any)(input, options, cb); @@ -114,6 +465,18 @@ function ClientRequest(input, options, cb) { } const pushChunk = chunk => { + // Socket-driven path: write the chunk straight to the socket once it is + // attached, otherwise buffer it for _flushToSocket(). + if (this[kCustomSocketPath]) { + if (this[kHandle]) { + this[kBodyChunks] = []; + this[kHandle].write(chunk); + } else { + this[kBodyChunks].push(chunk); + } + return; + } + this[kBodyChunks].push(chunk); if (writeCount > 1) { startFetch(); @@ -207,11 +570,39 @@ function ClientRequest(input, options, cb) { }; this.flushHeaders = function () { + if (this[kCustomSocketPath]) return; if (!fetching) { startFetch(); } }; + // Serialize the request head and write the request (head + any buffered body) + // to the socket produced by a custom agent. Called from onSocketNT once + // tickOnSocket() has wired up the parser + listeners. Writes go straight to + // the socket, which buffers them until the connection is established. + this._flushToSocket = () => { + const socket = this.socket; + if (!socket || socket.destroyed) return; + + this[kHandle] = socket; + + if (!this._headerSent) { + this._headerSent = true; + socket.write(buildRequestHead(this), "latin1"); + } + + const chunks = this[kBodyChunks]; + if (chunks && chunks.length > 0) { + for (let i = 0; i < chunks.length; i++) socket.write(chunks[i]); + this[kBodyChunks] = []; + this.emit("drain"); + } + + if (this.finished) { + process.nextTick(emitFinishNT, this); + } + }; + this.destroy = function (err?: Error) { if (this.destroyed) return this; this.destroyed = true; @@ -279,10 +670,27 @@ function ClientRequest(input, options, cb) { let fetching = false; const startFetch = (customBody?) => { + // When a custom agent produces the socket, the request bytes are written to + // it in _flushToSocket() once onSocket() fires — the native fetch path and + // CONNECT fast path are both bypassed. + if (this[kCustomSocketPath]) { + return false; + } + if (fetching) { return false; } + // CONNECT tunnels (HTTP proxies) have no representation in fetch(): the + // request target is a `host:port` authority, not a URL, and the response + // is a raw socket rather than a message body. Dispatch it over a raw TCP + // socket instead and emit the 'connect' event, matching Node. + if (this[kMethod] === "CONNECT") { + fetching = true; + startConnect(); + return true; + } + fetching = true; // Every entry point that dispatches the request (send(), flushHeaders(), @@ -603,6 +1011,286 @@ function ClientRequest(input, options, cb) { } }; + // Dispatch a CONNECT request over a raw TCP (or TLS) socket and emit the + // 'connect' event once the proxy's response status line + headers arrive. + // This mirrors Node's http.ClientRequest CONNECT handling so HTTP proxy + // clients (e.g. @grpc/grpc-js proxy support) work. + const startConnect = () => { + if (!this[kAbortController]) { + this[kAbortController] = new AbortController(); + this[kAbortController].signal.addEventListener("abort", onAbort, { once: true }); + } + + this[kUpgradeOrConnect] = true; + + let keepalive = true; + const agentKeepalive = this[kAgent]?.keepAlive; + if (agentKeepalive !== undefined) { + keepalive = agentKeepalive; + } + + const connectOptions: any = { + signal: this[kAbortController].signal, + }; + const socketPath = this[kSocketPath]; + if (socketPath) { + connectOptions.path = socketPath; + } else { + connectOptions.host = this[kHost]; + connectOptions.port = this[kPort]; + // Forward the socket-level options Node honors when connecting to the + // proxy authority, so a custom DNS resolver (split-horizon DNS, service + // discovery) and address selection work the same as the normal path. + // net.connect() implements the resolution itself, so no manual loop. + if (options.lookup !== undefined) connectOptions.lookup = options.lookup; + if (options.family !== undefined) connectOptions.family = options.family; + if (options.hints !== undefined) connectOptions.hints = options.hints; + if (options.localAddress !== undefined) connectOptions.localAddress = options.localAddress; + if (options.localPort !== undefined) connectOptions.localPort = options.localPort; + } + + const isTLS = this[kProtocol] === "https:"; + if (isTLS && this[kTls]) { + ObjectAssign(connectOptions, this[kTls]); + connectOptions.servername = this[kTls].servername; + } + + let socket; + try { + socket = isTLS ? tls().connect(connectOptions) : net().connect(connectOptions); + } catch (err) { + fetching = false; + process.nextTick((self, err) => self.emit("error", err), this, err); + // Keep this terminal path consistent with onError below: emit 'close' + // after 'error' so a req.on('close') cleanup listener still runs. + maybeEmitClose(); + return; + } + + this.socket = socket; + + // Default Host/Connection headers, matching Node. A CONNECT request with no + // Host header is rejected by many proxies (and by Bun's own server parser), + // so add one pointing at the proxy authority unless the caller set it. + if (!this.hasHeader("host") && !socketPath) { + let hostHeader = this[kHost]; + if (isIPv6(hostHeader)) { + hostHeader = `[${hostHeader}]`; + } + if (!this[kUseDefaultPort]) { + hostHeader += ":" + this[kPort]; + } + this.setHeader("Host", hostHeader); + } + if (!this.hasHeader("connection")) { + this.setHeader("Connection", keepalive ? "keep-alive" : "close"); + } + + // Write the CONNECT request line + headers. The request target is the + // `host:port` authority from options.path, not a URL path, so it must be + // written verbatim (no leading slash). Use the raw (original-case) header + // names so the wire bytes match what the caller set, like Node. + const headerLines = [`CONNECT ${this[kPath]} HTTP/1.1`]; + const rawNames = this.getRawHeaderNames(); + for (let i = 0; i < rawNames.length; i++) { + const name = rawNames[i]; + const value = this.getHeader(name); + if (value === undefined) continue; + if ($isJSArray(value)) { + for (let j = 0; j < value.length; j++) { + headerLines.push(`${name}: ${value[j]}`); + } + } else { + headerLines.push(`${name}: ${value}`); + } + } + const requestHead = headerLines.join("\r\n") + "\r\n\r\n"; + + let connected = false; + let buffer: Buffer | null = null; + const maxHeaderSize = this[kMaxHeaderSize] || getMaxHTTPHeaderSize(); + + const swallowTeardownError = () => {}; + + const onError = err => { + if (connected) return; + socket.removeListener("data", onData); + socket.removeListener("error", onError); + socket.removeListener("close", onClose); + // Keep swallowTeardownError attached here: on a pre-tunnel failure/abort + // the AbortController can still emit an AbortError on the socket after + // this runs, and it must not surface as an unhandled 'error'. + this[kClearTimeout]?.(); + // Abort/destroy is handled by onAbort → socketCloseListener, which emits + // 'close' and also synthesizes a socket 'close' that lands here; don't + // surface a spurious 'error' for a user-initiated teardown (Node doesn't). + if (isAbortError(err) || this.destroyed || this[abortedSymbol]) return; + // net/tls already produce a Node-shaped error (code/syscall/address/port), + // so propagate it verbatim like Node rather than flattening it. + fetching = false; + try { + this.emit("error", err); + } catch {} + // The request is done: emit 'close' like Node does after a failed request. + maybeEmitClose(); + }; + + const onClose = () => { + if (connected) return; + onError(new ConnResetException("socket hang up")); + }; + + const onData = chunk => { + buffer = buffer ? Buffer.concat([buffer, chunk]) : chunk; + + const headerEnd = buffer.indexOf("\r\n\r\n"); + if (headerEnd === -1) { + if (buffer.length > maxHeaderSize) { + socket.destroy(); + onError($HPE_HEADER_OVERFLOW("Header overflow")); + } + return; + } + // Reject an oversized header block even when it arrives complete (with its + // terminator) in a single read, so maxHeaderSize is honored the way Node's + // llhttp counts header bytes regardless of where \r\n\r\n lands. + if (headerEnd > maxHeaderSize) { + socket.destroy(); + onError($HPE_HEADER_OVERFLOW("Header overflow")); + return; + } + + const headerText = buffer.toString("latin1", 0, headerEnd); + + const lines = headerText.split("\r\n"); + const statusLine = lines.shift() || ""; + // "HTTP/1.1 200 Connection established" + const statusMatch = RegExpPrototypeExec.$call(CONNECT_STATUS_LINE_REGEX, statusLine); + if (!statusMatch) { + // A proxy that answers with an unparseable status line isn't a tunnel; + // fail the request instead of emitting 'connect' with no statusCode. + // onError runs before `connected` flips, so it still fires. + socket.destroy(); + onError($HPE_INVALID_HEADER_TOKEN("Parse Error: Invalid header token encountered")); + return; + } + + connected = true; + socket.removeListener("data", onData); + socket.removeListener("error", onError); + socket.removeListener("close", onClose); + // Hand the tunnel socket to the user with no internal listeners, like Node. + socket.removeListener("error", swallowTeardownError); + // Our internal 'data' listener put the socket into flowing mode; reset it + // to the neutral (neither flowing nor paused) state like Node does before + // emitting 'connect', so bytes after the headers stay buffered until the + // user attaches a 'data' listener / pipes / resumes (no data loss). + socket.readableFlowing = null; + this[kClearTimeout]?.(); + fetching = false; + + const head = headerEnd + 4 < buffer.length ? buffer.subarray(headerEnd + 4) : kEmptyBuffer; + buffer = null; + + const res = new IncomingMessage(null, kEmptyObject); + res.httpVersion = `${statusMatch[1]}.${statusMatch[2]}`; + res[statusCodeSymbol] = Number(statusMatch[3]); + // Deliver the reason phrase verbatim, "" when omitted, matching llhttp/Node. + res[statusMessageSymbol] = statusMatch[4] ?? ""; + + const rawHeaders: string[] = []; + // Null prototype so a proxy header literally named "constructor"/"__proto__" + // folds against an absent own property instead of an inherited one. + const parsedHeaders: Record = { __proto__: null } as any; + for (let i = 0; i < lines.length; i++) { + const line = lines[i]; + const colon = line.indexOf(":"); + if (colon === -1) continue; + const key = line.slice(0, colon); + // Strip OWS = *(SP / HTAB) on both sides of the value, matching llhttp + // (RFC 7230 §3.2.4), so padded proxy headers parse like they do in Node. + let start = colon + 1; + let end = line.length; + while (start < end && (line.charCodeAt(start) === 32 || line.charCodeAt(start) === 9)) start++; + while (end > start && (line.charCodeAt(end - 1) === 32 || line.charCodeAt(end - 1) === 9)) end--; + const val = line.slice(start, end); + $putByValDirect(rawHeaders, rawHeaders.length, key); + $putByValDirect(rawHeaders, rawHeaders.length, val); + // Fold into headers with Node's _addHeaderLine rules: set-cookie is + // always an array, singleton headers keep the first value, everything + // else is comma-joined. + const lowerKey = key.toLowerCase(); + const existing = parsedHeaders[lowerKey]; + if (lowerKey === "set-cookie") { + if (existing === undefined) parsedHeaders[lowerKey] = [val]; + else (existing as string[]).push(val); + } else if (existing === undefined) { + parsedHeaders[lowerKey] = val; + } else if (!kConnectSingletonHeaders.has(lowerKey)) { + parsedHeaders[lowerKey] = `${existing}, ${val}`; + } + } + res.headers = parsedHeaders; + res.rawHeaders = rawHeaders; + // The CONNECT response has no body; mark it complete so reads emit EOF + // instead of touching the (absent) fetch Response backing store. + res[noBodySymbol] = true; + res.complete = true; + res.push(null); + + // Point res.socket at the real tunnel socket and back-reference the + // response from the request, matching Node (res.socket === socket, + // req.res === res, res.upgrade === true). Node leaves res.req undefined + // for CONNECT, so we do too. + res.upgrade = true; + res.socket = socket; + this.res = res; + + // The request is finished from the writable side's perspective. + if (!this.finished) { + this.finished = true; + } + process.nextTick(emitFinishAndDeferredCloseNT); + + if (this.listenerCount("connect") > 0) { + this.emit("connect", res, socket, head); + } else { + // Node destroys the socket when nobody is listening for 'connect'. + socket.destroy(); + } + + // Attach this after the emit so the user's 'connect' handler sees the + // tunnel socket with no internal listeners, like Node. Socket 'close' is + // async, so a listener added here still fires even if the handler called + // socket.destroy() synchronously. Once the tunnel socket goes away, the + // request is done too: emit 'close' the way Node does on CONNECT close. + socket.once("close", () => { + maybeEmitClose(); + }); + }; + + // Swallow a late error that fires during pre-tunnel teardown (e.g. the + // AbortController's AbortError when the request is aborted/destroyed before + // the tunnel is established) so it doesn't surface as an unhandled 'error'. + // Removed once the tunnel is handed to the user so the socket is delivered + // with no internal listeners, like Node. + socket.on("error", swallowTeardownError); + socket.on("data", onData); + socket.on("error", onError); + socket.on("close", onClose); + + const writeHead = () => { + socket.write(requestHead); + }; + if (socket.connecting) { + socket.once(isTLS ? "secureConnect" : "connect", writeHead); + } else { + writeHead(); + } + + return true; + }; + let onEnd = () => {}; let handleResponse: (() => void) | undefined = () => {}; // Set once handleResponse()'s nextTick has run and found the writable side @@ -621,6 +1309,16 @@ function ClientRequest(input, options, cb) { const send = () => { this.finished = true; + if (this[kCustomSocketPath]) { + // The socket-driven path flushes the request (and emits 'finish') from + // _flushToSocket() once the agent produces a socket. If the socket is + // already attached (the agent called onSocket synchronously), flush now. + if (this[kHandle]) { + this._flushToSocket(); + } + return; + } + var body = this[kBodyChunks] && this[kBodyChunks].length > 1 ? new Blob(this[kBodyChunks]) : this[kBodyChunks]?.[0]; try { @@ -977,10 +1675,32 @@ function ClientRequest(input, options, cb) { this._httpMessage = this; - process.nextTick(emitContinueAndSocketNT, this); - this[kEmitState] = 0; + // A custom agent that overrides addRequest (e.g. the `tunnel` package) + // produces its own socket, which Bun must speak HTTP/1.x over directly + // instead of going through native fetch. Agents that expose a `.proxy` href + // (https-proxy-agent / http-proxy-agent) continue to use the native fetch + // proxy fast path, so they are excluded here. + let usesCustomSocket = false; + if (agent && agent.addRequest !== Agent.prototype.addRequest) { + let agentProxy; + try { + agentProxy = agent.proxy; + } catch {} + if (agentProxy == null) { + usesCustomSocket = true; + } + } + this[kCustomSocketPath] = usesCustomSocket; + + if (usesCustomSocket) { + // Node defaults Connection based on the agent's keepAlive. + this.shouldKeepAlive = !!agent.keepAlive; + } else { + process.nextTick(emitContinueAndSocketNT, this); + } + this.setSocketKeepAlive = (_enable = true, _initialDelay = 0) => {}; this.setNoDelay = (_noDelay = true) => {}; @@ -993,12 +1713,25 @@ function ClientRequest(input, options, cb) { this.removeAllListeners("timeout"); } }; + + if (usesCustomSocket) { + // initiate connection: the agent creates a socket and calls onSocket(). + // Matches node:http, which calls agent.addRequest(this, opts) at the end of + // the ClientRequest constructor. + agent.addRequest(this, optsWithoutSignal); + } } const ClientRequestPrototype = { constructor: ClientRequest, __proto__: OutgoingMessage.prototype, + // Called by a custom agent (e.g. `tunnel`) with the socket it produced. + onSocket(socket, err) { + process.nextTick(onSocketNT, this, socket, err); + return this; + }, + setTimeout(msecs, callback) { if (this.destroyed) { return this; diff --git a/src/js/node/_http_incoming.ts b/src/js/node/_http_incoming.ts index 83de7a47c71..6921ced937a 100644 --- a/src/js/node/_http_incoming.ts +++ b/src/js/node/_http_incoming.ts @@ -149,10 +149,16 @@ function IncomingMessage(req, options = defaultIncomingOpts) { this[fakeSocketSymbol] = req; } } else { - // Node defaults url and method to null. + // Node defaults url and method to null. This is the path taken when the + // HTTPParser constructs a response IncomingMessage over a raw socket + // (the socket-driven client path), passing the socket as `req`. this.url = ""; this.method = null; + this.headers = {}; this.rawHeaders = []; + if (req && typeof req.on === "function") { + this[fakeSocketSymbol] = req; + } } this[noBodySymbol] = @@ -294,6 +300,68 @@ const IncomingMessagePrototype = { _finish() { this.emit("prefinish"); }, + // Populate this.headers / this.rawHeaders from the flat [key, value, ...] + // array produced by the HTTPParser. Used by the socket-driven client path + // (see _http_common.ts parserOnHeadersComplete). Mirrors the observable + // behavior of node:http's IncomingMessage._addHeaderLines. + _addHeaderLines(headers, n) { + if (headers?.length) { + // Trailers are exposed as a frozen empty object on IncomingMessage, so + // only leading headers are populated here (matches the fetch path). + if (this.complete) return; + this.rawHeaders = headers; + const dest = this.headers; + + if (dest) { + for (let i = 0; i < n; i += 2) { + this._addHeaderLine(headers[i], headers[i + 1], dest); + } + } + } + }, + _addHeaderLine(field, value, dest) { + const lower = field.toLowerCase(); + const existing = dest[lower]; + + if (lower === "set-cookie") { + if (existing !== undefined) { + existing.push(value); + } else { + dest[lower] = [value]; + } + return; + } + + if (existing === undefined) { + dest[lower] = value; + return; + } + + // These headers are not joined with ", " by Node; the first value wins. + switch (lower) { + case "age": + case "authorization": + case "content-length": + case "content-type": + case "etag": + case "expires": + case "from": + case "host": + case "if-modified-since": + case "if-unmodified-since": + case "last-modified": + case "location": + case "max-forwards": + case "proxy-authorization": + case "referer": + case "retry-after": + case "server": + case "user-agent": + return; + default: + dest[lower] = existing + ", " + value; + } + }, _destroy: function IncomingMessage_destroy(err, cb) { const shouldEmitAborted = !this.readableEnded || !this.complete; diff --git a/test/bun.lock b/test/bun.lock index dcb5ca244e4..a1f555505ac 100644 --- a/test/bun.lock +++ b/test/bun.lock @@ -94,6 +94,7 @@ "supertest": "6.3.3", "svelte": "5.20.4", "tsyringe": "4.8.0", + "tunnel": "0.0.6", "type-graphql": "2.0.0-rc.2", "typeorm": "0.3.20", "typescript": "6.0.2", @@ -2551,6 +2552,8 @@ "tsyringe": ["tsyringe@4.8.0", "", { "dependencies": { "tslib": "^1.9.3" } }, "sha512-YB1FG+axdxADa3ncEtRnQCFq/M0lALGLxSZeVNbTU8NqhOVc51nnv2CISTcvc1kyv6EGPtXVr0v6lWeDxiijOA=="], + "tunnel": ["tunnel@0.0.6", "", {}, "sha512-1h/Lnq9yajKY2PEbBadPXj3VxsDDu844OnaAo52UVmIzIvwwtBPIuNvkjuzBlTWpfJyUbG3ez0KSBibQkj4ojg=="], + "tunnel-agent": ["tunnel-agent@0.6.0", "", { "dependencies": { "safe-buffer": "^5.0.1" } }, "sha512-McnNiV1l8RYeY8tBgEpuodCC1mLUdbSN+CYBL7kJsJNInOP8UjDDEwdk6Mw60vdLLrr5NHKZhMAOSrR2NZuQ+w=="], "turbo-stream": ["turbo-stream@2.2.0", "", {}, "sha512-FKFg7A0To1VU4CH9YmSMON5QphK0BXjSoiC7D9yMh+mEEbXLUP9qJ4hEt1qcjKtzncs1OpcnjZO8NgrlVbZH+g=="], diff --git a/test/js/node/http/node-http-connect.test.ts b/test/js/node/http/node-http-connect.test.ts index 5e486e48889..71c1c9a4ba3 100644 --- a/test/js/node/http/node-http-connect.test.ts +++ b/test/js/node/http/node-http-connect.test.ts @@ -440,7 +440,434 @@ describe("HTTP server socket access via normal requests", () => { }); }); +describe("HTTP client CONNECT", () => { + test("http.request CONNECT tunnels through a proxy and emits 'connect'", async () => { + // A minimal CONNECT proxy that echoes tunneled bytes back. + const proxyServer = http.createServer(); + let target = ""; + let proxySocket: net.Socket | undefined; + proxyServer.on("connect", (req, clientSocket) => { + proxySocket = clientSocket; + target = req.url ?? ""; + clientSocket.on("error", () => {}); + clientSocket.write("HTTP/1.1 200 Connection established\r\nProxy-Agent: bun-test\r\n\r\n"); + clientSocket.on("data", d => clientSocket.write(d)); + }); + await once(proxyServer.listen(0, "127.0.0.1"), "listening"); + const { port } = proxyServer.address() as AddressInfo; + + try { + const { promise, resolve, reject } = Promise.withResolvers<{ + statusCode: number; + headers: Record; + echoed: string; + socketIsTunnel: boolean; + reqResIsRes: boolean; + upgrade: unknown; + closeListeners: number; + }>(); + + const req = http.request({ method: "CONNECT", host: "127.0.0.1", port, path: "example.com:443" }); + req.on("connect", (res, socket, head) => { + // Node wires res.socket to the tunnel socket, req.res to the response, + // and marks res.upgrade === true. + const socketIsTunnel = res.socket === socket; + const reqResIsRes = req.res === res; + const upgrade = (res as any).upgrade; + // Node hands the tunnel socket to 'connect' with no internal listeners; + // capture the 'close' listener count before we attach our own. + const closeListeners = socket.listenerCount("close"); + socket.on("error", () => {}); + socket.on("data", d => { + resolve({ + statusCode: res.statusCode, + headers: res.headers, + echoed: d.toString(), + socketIsTunnel, + reqResIsRes, + upgrade, + closeListeners, + }); + socket.destroy(); + }); + socket.write("ping"); + }); + req.on("error", reject); + req.end(); + + const result = await promise; + // The tunnel target must be sent verbatim (no leading slash). + expect(target).toBe("example.com:443"); + expect(result.statusCode).toBe(200); + expect(result.headers["proxy-agent"]).toBe("bun-test"); + expect(result.echoed).toBe("ping"); + expect(result.socketIsTunnel).toBe(true); + expect(result.reqResIsRes).toBe(true); + expect(result.upgrade).toBe(true); + // The socket is handed over with no internal listeners, like Node. + expect(result.closeListeners).toBe(0); + } finally { + proxySocket?.destroy(); + await new Promise(r => proxyServer.close(() => r())); + } + }); + + test("http.request CONNECT emits 'connect' even on a non-200 status", async () => { + const proxyServer = net.createServer(socket => { + socket.on("error", () => {}); + socket.on("data", () => socket.write("HTTP/1.1 403 Forbidden\r\nX-Reason: denied\r\n\r\n")); + }); + await once(proxyServer.listen(0, "127.0.0.1"), "listening"); + const { port } = proxyServer.address() as AddressInfo; + + try { + const { promise, resolve, reject } = Promise.withResolvers<{ + statusCode: number; + statusMessage: string; + headers: Record; + }>(); + + const req = http.request({ method: "CONNECT", host: "127.0.0.1", port, path: "example.com:443" }); + req.on("connect", (res, socket) => { + resolve({ statusCode: res.statusCode, statusMessage: res.statusMessage, headers: res.headers }); + socket.destroy(); + }); + req.on("error", reject); + req.end(); + + const result = await promise; + expect(result.statusCode).toBe(403); + expect(result.statusMessage).toBe("Forbidden"); + expect(result.headers["x-reason"]).toBe("denied"); + } finally { + await new Promise(r => proxyServer.close(() => r())); + } + }); + + test("http.request CONNECT trims optional whitespace around proxy header values", async () => { + const proxyServer = net.createServer(socket => { + socket.on("error", () => {}); + // Leading tab, two leading spaces, and a trailing space — llhttp trims all. + socket.on("data", () => + socket.write("HTTP/1.1 200 OK\r\nX-Tab:\tt-val\r\nX-Two: two-val\r\nX-Trail: trail-val \r\n\r\n"), + ); + }); + await once(proxyServer.listen(0, "127.0.0.1"), "listening"); + const { port } = proxyServer.address() as AddressInfo; + + try { + const { promise, resolve, reject } = Promise.withResolvers>(); + const req = http.request({ method: "CONNECT", host: "127.0.0.1", port, path: "h:1" }); + req.on("connect", (res, socket) => { + resolve(res.headers); + socket.destroy(); + }); + req.on("error", reject); + req.end(); + + const headers = await promise; + expect(headers["x-tab"]).toBe("t-val"); + expect(headers["x-two"]).toBe("two-val"); + expect(headers["x-trail"]).toBe("trail-val"); + } finally { + await new Promise(r => proxyServer.close(() => r())); + } + }); + + test("http.request CONNECT folds duplicate proxy response headers like Node", async () => { + const proxyServer = net.createServer(socket => { + socket.on("error", () => {}); + socket.on("data", () => + socket.write( + "HTTP/1.1 200 OK\r\nSet-Cookie: a=1\r\nSet-Cookie: b=2\r\nContent-Length: 10\r\nContent-Length: 20\r\nX-Multi: p\r\nX-Multi: q\r\nConstructor: own\r\n\r\n", + ), + ); + }); + await once(proxyServer.listen(0, "127.0.0.1"), "listening"); + const { port } = proxyServer.address() as AddressInfo; + + try { + const { promise, resolve, reject } = Promise.withResolvers>(); + const req = http.request({ method: "CONNECT", host: "127.0.0.1", port, path: "h:1" }); + req.on("connect", (res, socket) => { + resolve(res.headers); + socket.destroy(); + }); + req.on("error", reject); + req.end(); + + const headers = await promise; + // set-cookie is always an array; singleton headers keep the first value; + // other duplicates are comma-joined. + expect(headers["set-cookie"]).toEqual(["a=1", "b=2"]); + expect(headers["content-length"]).toBe("10"); + expect(headers["x-multi"]).toBe("p, q"); + // A header whose name collides with Object.prototype must fold against an + // absent own property, not the inherited one. + expect(headers["constructor"]).toBe("own"); + } finally { + await new Promise(r => proxyServer.close(() => r())); + } + }); + + test("http.request CONNECT delivers bytes received after the headers as 'head'", async () => { + const proxyServer = net.createServer(socket => { + socket.on("error", () => {}); + socket.on("data", () => socket.write("HTTP/1.1 200 OK\r\n\r\nEARLY-DATA")); + }); + await once(proxyServer.listen(0, "127.0.0.1"), "listening"); + const { port } = proxyServer.address() as AddressInfo; + + try { + const { promise, resolve, reject } = Promise.withResolvers(); + const req = http.request({ method: "CONNECT", host: "127.0.0.1", port, path: "h:1" }); + req.on("connect", (res, socket, head) => { + expect(head).toBeInstanceOf(Buffer); + resolve(head.toString()); + socket.destroy(); + }); + req.on("error", reject); + req.end(); + + expect(await promise).toBe("EARLY-DATA"); + } finally { + await new Promise(r => proxyServer.close(() => r())); + } + }); + + test("http.request CONNECT buffers post-tunnel data until a listener is attached", async () => { + // Node resets the tunnel socket to the neutral (non-flowing) state before + // emitting 'connect', so bytes arriving after the headers are buffered and a + // 'data' listener attached later (e.g. after an await) still receives them. + const proxySockets: net.Socket[] = []; + const proxyServer = net.createServer(socket => { + proxySockets.push(socket); + socket.on("error", () => {}); + socket.on("data", () => { + socket.write("HTTP/1.1 200 Connection established\r\n\r\n"); + // Send the tunneled bytes in a separate write, a short moment after the + // headers, so they land in a TCP read distinct from the header block + // (not coalesced into it, which would deliver them as 'head' instead). + setTimeout(() => socket.write("LATE-DATA"), 20); + }); + }); + await once(proxyServer.listen(0, "127.0.0.1"), "listening"); + const { port } = proxyServer.address() as AddressInfo; + + try { + const { promise, resolve, reject } = Promise.withResolvers<{ flowing: unknown; data: string }>(); + const req = http.request({ method: "CONNECT", host: "127.0.0.1", port, path: "h:1" }); + req.on("connect", async (res, socket) => { + const flowing = (socket as any).readableFlowing; + socket.on("error", () => {}); + // Let the event loop turn so the post-header bytes physically arrive at + // the socket before the listener is attached; they must be buffered (not + // dropped) until now. There is no passive "data buffered" signal to wait + // on here — while flowing === null the bytes sit at the socket handle + // (readableLength stays 0) on both Node and Bun until a consumer resumes + // the stream, so a short yield is the condition that exercises this. + await Bun.sleep(50); + let data = ""; + socket.on("data", d => { + data += d.toString(); + if (data.includes("LATE-DATA")) { + resolve({ flowing, data }); + socket.destroy(); + } + }); + }); + req.on("error", reject); + req.end(); + + const result = await promise; + // Matches Node: socket handed over in the neutral state, data buffered. + expect(result.flowing).toBe(null); + expect(result.data).toBe("LATE-DATA"); + } finally { + for (const s of proxySockets) s.destroy(); + await new Promise(r => proxyServer.close(() => r())); + } + }); + + test("http.request CONNECT emits 'error' then 'close' when the proxy is unreachable", async () => { + // Bind then immediately close to obtain a port nothing listens on. + const tmp = net.createServer(); + await once(tmp.listen(0, "127.0.0.1"), "listening"); + const { port } = tmp.address() as AddressInfo; + await new Promise(r => tmp.close(() => r())); + + const { promise, resolve, reject } = Promise.withResolvers<{ + code: string; + syscall: string; + address: string; + port: number; + events: string[]; + }>(); + const events: string[] = []; + const req = http.request({ method: "CONNECT", host: "127.0.0.1", port, path: "example.com:443" }); + req.on("connect", () => reject(new Error("unexpected connect"))); + let err: NodeJS.ErrnoException | undefined; + req.on("error", e => { + events.push("error"); + err = e as NodeJS.ErrnoException; + }); + // Node emits 'close' on the request after a failed connection. + req.on("close", () => { + events.push("close"); + resolve({ + code: err?.code ?? "", + syscall: err?.syscall ?? "", + address: err?.address ?? "", + port: err?.port ?? 0, + events, + }); + }); + req.end(); + + const result = await promise; + expect(result.code).toBe("ECONNREFUSED"); + expect(result.events).toEqual(["error", "close"]); + // The net error is propagated verbatim (Node-shaped), so the diagnostic + // fields survive rather than being flattened to a bare Error. + expect(result.syscall).toBe("connect"); + expect(result.address).toBe("127.0.0.1"); + expect(result.port).toBe(port); + }); + + test("http.request CONNECT rejects a malformed proxy status line with 'error'", async () => { + const proxyServer = net.createServer(socket => { + socket.on("error", () => {}); + // Not a valid HTTP status line, but terminated like a header block. + socket.on("data", () => socket.write("garbage not http\r\nX: y\r\n\r\n")); + }); + await once(proxyServer.listen(0, "127.0.0.1"), "listening"); + const { port } = proxyServer.address() as AddressInfo; + + try { + const { promise, resolve, reject } = Promise.withResolvers(); + const req = http.request({ method: "CONNECT", host: "127.0.0.1", port, path: "example.com:443" }); + req.on("connect", () => reject(new Error("unexpected connect on malformed response"))); + req.on("error", err => resolve((err as NodeJS.ErrnoException).code ?? err.message)); + req.end(); + + // Node surfaces an llhttp parse error (HPE_*). We only require that it's an + // error rather than a bogus tunnel, so assert the code class loosely. + const code = await promise; + expect(code).toContain("HPE_"); + } finally { + await new Promise(r => proxyServer.close(() => r())); + } + }); + + test("http.request CONNECT rejects a header block larger than maxHeaderSize", async () => { + // Oversized headers delivered complete (with the terminator) in one write + // must still be rejected, matching Node's llhttp byte counting. + const proxyServer = net.createServer(socket => { + socket.on("error", () => {}); + socket.on("data", () => + socket.write("HTTP/1.1 200 OK\r\nX: " + Buffer.alloc(20000, "a").toString() + "\r\n\r\n"), + ); + }); + await once(proxyServer.listen(0, "127.0.0.1"), "listening"); + const { port } = proxyServer.address() as AddressInfo; + + try { + const { promise, resolve, reject } = Promise.withResolvers(); + const req = http.request({ method: "CONNECT", host: "127.0.0.1", port, path: "h:1", maxHeaderSize: 16384 }); + req.on("connect", () => reject(new Error("unexpected connect on oversized headers"))); + req.on("error", err => resolve((err as NodeJS.ErrnoException).code ?? err.message)); + req.end(); + + expect(await promise).toBe("HPE_HEADER_OVERFLOW"); + } finally { + await new Promise(r => proxyServer.close(() => r())); + } + }); + + test("http.request CONNECT destroyed before the proxy responds emits 'close' without crashing", async () => { + // A proxy that accepts the TCP connection but never sends a response. + const proxySockets: net.Socket[] = []; + const proxyServer = net.createServer(socket => { + socket.on("error", () => {}); + proxySockets.push(socket); + }); + await once(proxyServer.listen(0, "127.0.0.1"), "listening"); + const { port } = proxyServer.address() as AddressInfo; + + try { + const { promise, resolve } = Promise.withResolvers(); + const events: string[] = []; + const req = http.request({ method: "CONNECT", host: "127.0.0.1", port, path: "h:1" }); + req.on("connect", () => events.push("connect")); + // A spurious 'error' after 'close' (or an unhandled AbortError) would be a bug. + req.on("error", e => events.push("error:" + ((e as NodeJS.ErrnoException).code ?? e.message))); + req.on("close", () => { + events.push("close"); + resolve(events); + }); + req.end(); + // Destroy before the proxy has written anything. + await once(req, "socket"); + req.destroy(); + + const result = await promise; + // The request must close; it must not emit a spurious post-close error. + expect(result).toContain("close"); + expect(result[result.length - 1]).toBe("close"); + } finally { + for (const s of proxySockets) s.destroy(); + await new Promise(r => proxyServer.close(() => r())); + } + }); + + test("http.request CONNECT resolves the proxy host with a custom lookup", async () => { + const proxyServer = http.createServer(); + let proxySocket: net.Socket | undefined; + proxyServer.on("connect", (req, clientSocket) => { + proxySocket = clientSocket; + clientSocket.on("error", () => {}); + clientSocket.write("HTTP/1.1 200 Connection established\r\n\r\n"); + }); + await once(proxyServer.listen(0, "127.0.0.1"), "listening"); + const { port } = proxyServer.address() as AddressInfo; + + try { + let lookupCalledWith = ""; + const { promise, resolve, reject } = Promise.withResolvers(); + const req = http.request({ + method: "CONNECT", + // A name that only the custom lookup can resolve. + host: "proxy.invalid.test", + port, + path: "example.com:443", + lookup: (hostname: string, _opts: any, cb: any) => { + lookupCalledWith = hostname; + // net.connect() defaults autoSelectFamily on, so the all-addresses + // array form is what both Node and Bun expect here. + cb(null, [{ address: "127.0.0.1", family: 4 }]); + }, + }); + req.on("connect", (res, socket) => { + socket.destroy(); + resolve(res.statusCode); + }); + req.on("error", reject); + req.end(); + + const statusCode = await promise; + expect(lookupCalledWith).toBe("proxy.invalid.test"); + expect(statusCode).toBe(200); + } finally { + proxySocket?.destroy(); + await new Promise(r => proxyServer.close(() => r())); + } + }); +}); + describe("Should be compatible with node.js", () => { + // These spawn a full `node --test` / `bun test` run of the sibling file, which + // can take several seconds. Give them a generous timeout so they don't race the + // default 5s deadline on a loaded CI machine. test("tests should run on node.js", async () => { const process = Bun.spawn({ cmd: [nodeExe(), "--test", join(import.meta.dir, "node-http-connect.node.mts")], @@ -450,7 +877,7 @@ describe("Should be compatible with node.js", () => { env: bunEnv, }); expect(await process.exited).toBe(0); - }); + }, 30_000); test("tests should run on bun", async () => { const process = Bun.spawn({ cmd: [bunExe(), "test", join(import.meta.dir, "node-http-connect.node.mts")], @@ -460,5 +887,5 @@ describe("Should be compatible with node.js", () => { env: bunEnv, }); expect(await process.exited).toBe(0); - }); + }, 30_000); }); diff --git a/test/js/node/http/node-http.test.ts b/test/js/node/http/node-http.test.ts index 6c3b2a1d25a..9cbd45b96a1 100644 --- a/test/js/node/http/node-http.test.ts +++ b/test/js/node/http/node-http.test.ts @@ -23,7 +23,8 @@ import http, { } from "node:http"; import https, { createServer as createHttpsServer } from "node:https"; import type { AddressInfo } from "node:net"; -import { connect } from "node:net"; +import { connect, createServer as createNetServer } from "node:net"; +import tunnel from "tunnel"; import { tmpdir } from "node:os"; import * as path from "node:path"; import { PassThrough } from "node:stream"; @@ -829,6 +830,99 @@ describe("node:http", () => { await runHTTPProxyTest(); }); + // https://github.com/oven-sh/bun/issues/31795 + // A custom agent (the `tunnel` package) establishes an HTTP CONNECT tunnel + // by overriding addRequest/createSocket and calling req.onSocket(). Bun + // must route through those hooks and emit a CONNECT, not bypass the proxy. + it("uses a tunnel.httpsOverHttp() agent and sends CONNECT to the proxy", async () => { + const { promise: connectLine, resolve: gotConnect, reject: connectFailed } = Promise.withResolvers(); + + // Minimal proxy that records the CONNECT request it receives and replies + // 502 so the tunnel fails the same way it does in Node. + await using proxy = createNetServer(socket => { + socket.once("data", buf => { + gotConnect(buf.toString()); + socket.end("HTTP/1.1 502 Bad Gateway\r\nContent-Length: 0\r\n\r\n"); + }); + }); + await once(proxy.listen(0, "127.0.0.1"), "listening"); + const proxyPort = (proxy.address() as AddressInfo).port; + + const agent = tunnel.httpsOverHttp({ proxy: { host: "127.0.0.1", port: proxyPort } }); + + const { promise: requestDone, resolve: requestResolve } = Promise.withResolvers<{ + status?: number; + error?: string; + }>(); + const req = https.request( + { host: "example.com", port: 443, path: "/", method: "GET", agent }, + res => { + res.resume(); + requestResolve({ status: res.statusCode }); + }, + ); + req.on("error", err => requestResolve({ error: (err as Error).message })); + req.end(); + + // The proxy must receive a CONNECT targeting the requested host:port. + const received = await connectLine; + expect(received).toContain("CONNECT example.com:443 HTTP/1.1"); + expect(received.toLowerCase()).toContain("host: example.com:443"); + + // And the request must surface the proxy's failure, not a direct 200. + const result = await requestDone; + expect(result.status).toBeUndefined(); + expect(result.error).toContain("statusCode=502"); + }); + + // Full success path: the proxy accepts the CONNECT and pipes bytes through, + // so the tunneled request receives the target server's response. + it("tunnels a request through an HTTP CONNECT proxy (tunnel.httpOverHttp)", async () => { + await using target = createServer((req, res) => { + res.writeHead(200, { "x-tunneled": "yes" }); + res.end("through-the-tunnel"); + }); + await once(target.listen(0, "127.0.0.1"), "listening"); + const targetPort = (target.address() as AddressInfo).port; + + let connectTarget: string | undefined; + await using proxy = createServer(); + proxy.on("connect", (req, clientSocket, head) => { + connectTarget = req.url as string; + const [host, port] = (req.url as string).split(":"); + const serverSocket = connect(Number(port), host, () => { + clientSocket.write("HTTP/1.1 200 Connection Established\r\n\r\n"); + if (head?.length) serverSocket.write(head); + serverSocket.pipe(clientSocket); + clientSocket.pipe(serverSocket); + }); + serverSocket.on("error", () => clientSocket.end()); + }); + await once(proxy.listen(0, "127.0.0.1"), "listening"); + const proxyPort = (proxy.address() as AddressInfo).port; + + const agent = tunnel.httpOverHttp({ proxy: { host: "127.0.0.1", port: proxyPort } }); + + const { promise, resolve, reject } = Promise.withResolvers<{ status: number; header?: string; body: string }>(); + const req = http.request( + { host: "127.0.0.1", port: targetPort, path: "/", method: "GET", agent }, + res => { + let body = ""; + res.setEncoding("utf8"); + res.on("data", c => (body += c)); + res.on("end", () => resolve({ status: res.statusCode as number, header: res.headers["x-tunneled"], body })); + }, + ); + req.on("error", reject); + req.end(); + + const result = await promise; + expect(result).toEqual({ status: 200, header: "yes", body: "through-the-tunnel" }); + // The request must have gone through the proxy's CONNECT handler, not a + // direct connection to the target (which would bypass the agent). + expect(connectTarget).toBe(`127.0.0.1:${targetPort}`); + }); + it("should correctly stream a multi-chunk response #5320", async done => { runTest(done, (server, serverPort, done) => { const req = request({ diff --git a/test/package.json b/test/package.json index c24224c77d4..1301df5fb19 100644 --- a/test/package.json +++ b/test/package.json @@ -98,6 +98,7 @@ "supertest": "6.3.3", "svelte": "5.20.4", "tsyringe": "4.8.0", + "tunnel": "0.0.6", "type-graphql": "2.0.0-rc.2", "typeorm": "0.3.20", "typescript": "6.0.2", From 2ffde14603161ac959a33abcd0259c7d00558a0c Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Thu, 4 Jun 2026 09:14:11 +0000 Subject: [PATCH 2/2] [autofix.ci] apply automated fixes --- test/js/node/http/node-http.test.ts | 28 +++++++++++----------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/test/js/node/http/node-http.test.ts b/test/js/node/http/node-http.test.ts index 9cbd45b96a1..d36be8e6528 100644 --- a/test/js/node/http/node-http.test.ts +++ b/test/js/node/http/node-http.test.ts @@ -24,10 +24,10 @@ import http, { import https, { createServer as createHttpsServer } from "node:https"; import type { AddressInfo } from "node:net"; import { connect, createServer as createNetServer } from "node:net"; -import tunnel from "tunnel"; import { tmpdir } from "node:os"; import * as path from "node:path"; import { PassThrough } from "node:stream"; +import tunnel from "tunnel"; import { run as runHTTPProxyTest } from "./node-http-proxy.js"; const { describe, expect, it, beforeAll, afterAll, createDoneDotAll, mock, test } = createTest(import.meta.path); @@ -854,13 +854,10 @@ describe("node:http", () => { status?: number; error?: string; }>(); - const req = https.request( - { host: "example.com", port: 443, path: "/", method: "GET", agent }, - res => { - res.resume(); - requestResolve({ status: res.statusCode }); - }, - ); + const req = https.request({ host: "example.com", port: 443, path: "/", method: "GET", agent }, res => { + res.resume(); + requestResolve({ status: res.statusCode }); + }); req.on("error", err => requestResolve({ error: (err as Error).message })); req.end(); @@ -904,15 +901,12 @@ describe("node:http", () => { const agent = tunnel.httpOverHttp({ proxy: { host: "127.0.0.1", port: proxyPort } }); const { promise, resolve, reject } = Promise.withResolvers<{ status: number; header?: string; body: string }>(); - const req = http.request( - { host: "127.0.0.1", port: targetPort, path: "/", method: "GET", agent }, - res => { - let body = ""; - res.setEncoding("utf8"); - res.on("data", c => (body += c)); - res.on("end", () => resolve({ status: res.statusCode as number, header: res.headers["x-tunneled"], body })); - }, - ); + const req = http.request({ host: "127.0.0.1", port: targetPort, path: "/", method: "GET", agent }, res => { + let body = ""; + res.setEncoding("utf8"); + res.on("data", c => (body += c)); + res.on("end", () => resolve({ status: res.statusCode as number, header: res.headers["x-tunneled"], body })); + }); req.on("error", reject); req.end();