Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
352 changes: 352 additions & 0 deletions src/js/node/_http_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,16 @@ const {
reqSymbol,
callCloseCallback,
emitCloseNTAndComplete,
bodyStreamSymbol,
statusCodeSymbol,
statusMessageSymbol,
} = require("internal/http");

const { HTTPParser, freeParser } = require("node:_http_common");

const { globalAgent } = require("node:_http_agent");
const { getLazy } = require("internal/shared");
const tls = getLazy(() => require("node:tls"));
const { IncomingMessage } = require("node:_http_incoming");
const { OutgoingMessage } = require("node:_http_outgoing");

Expand Down Expand Up @@ -266,11 +273,354 @@ function ClientRequest(input, options, cb) {

let fetching = false;

const startFetchViaSocket = () => {
fetching = true;

const method = this[kMethod];
const path = this[kPath];
const host = this[kHost];
const port = this[kPort];
const protocol = this[kProtocol];
const socketPath = this[kSocketPath];

// Pass full options to createConnection, matching Node.js behavior.
// The options object is compatible with net.connect() / tls.connect().
const connectOptions = { ...options, host, port, path: socketPath || undefined };

let socket;
try {
socket = createConnection(connectOptions);
} catch (err) {
process.nextTick((self, e) => self.emit("error", e), this, err);
return false;
}

this.socket = socket;
this[kAbortController]?.signal?.addEventListener("abort", () => socket.destroy(), { once: true });

// --- Write HTTP/1.1 request ---
const writeRequest = () => {
const headers = this.getHeaders();
let head = `${method} ${path} HTTP/1.1\r\n`;

if (!headers.host && !headers.Host) {
const dp = protocol === "https:" ? 443 : 80;
const hostStr = isIPv6(host) ? `[${host}]` : host;
head += port && port !== dp ? `Host: ${hostStr}:${port}\r\n` : `Host: ${hostStr}\r\n`;
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

const chunks = this[kBodyChunks];
let body;
if (chunks?.length > 0) {
const bufs = chunks.map(c => (typeof c === "string" ? Buffer.from(c) : c));
body = bufs.length === 1 ? bufs[0] : Buffer.concat(bufs);
}

// Check if caller already set framing headers
let hasContentLength = false;
let hasTransferEncoding = false;
for (const key of Object.keys(headers)) {
const lk = key.toLowerCase();
if (lk === "content-length") hasContentLength = true;
else if (lk === "transfer-encoding") hasTransferEncoding = true;
const val = headers[key];
if (val === undefined) continue;
if ($isJSArray(val)) {
for (const v of val) head += `${key}: ${v}\r\n`;
} else {
head += `${key}: ${val}\r\n`;
}
}

// Only add Content-Length if caller didn't set framing headers
if (body && !hasContentLength && !hasTransferEncoding) {
head += `Content-Length: ${body.byteLength}\r\n`;
}
head += "\r\n";
socket.write(head);
if (body) socket.write(body);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
};

// --- Parse HTTP/1.1 response using llhttp (via HTTPParser) ---
const parser = new HTTPParser();
parser._headers = [];
parser._url = "";
parser.maxHeaderPairs = 2000;
parser.socket = socket;
parser.initialize(HTTPParser.RESPONSE, {});

let res: any = null;
let parserFreed = false;
let upgraded = false;
let pendingUpgrade: { res: any; event: string } | null = null;

const safelyFreeParser = () => {
if (!parserFreed) {
parserFreed = true;
freeParser(parser, this, socket);
}
};

const responseComplete = () => {
if (res && !res.complete) {
res.push(null);
res.complete = true;
}
this[kClearTimeout]();
fetching = false;
this[kFetchRequest] = null;
safelyFreeParser();
maybeEmitClose();
};

// Build headers object from the flat [key, val, key, val, ...] array llhttp produces.
// Duplicates are joined with ", " (matching Node.js), except set-cookie which is an array.
const buildHeaders = (rawHeaders: string[]) => {
const headers: any = Object.create(null);
for (let i = 0; i < rawHeaders.length; i += 2) {
const lk = rawHeaders[i].toLowerCase();
const v = rawHeaders[i + 1];
if (lk === "set-cookie") {
headers[lk] = headers[lk] ? [...headers[lk], v] : [v];
} else if (headers[lk] !== undefined) {
headers[lk] += ", " + v;
} else {
headers[lk] = v;
}
}
return headers;
};

parser[HTTPParser.kOnHeadersComplete] = (
vMaj,
vMin,
headers,
_method,
_url,
statusCode,
statusMessage,
upgrade,
shouldKeepAlive,
) => {
if (headers === undefined) {
headers = parser._headers;
parser._headers = [];
}

// 1xx informational responses (100 Continue, 103 Early Hints, etc.)
// are not terminal — emit "information" and let the parser continue
// waiting for the final response. Note: 101 Switching Protocols is
// handled below as an upgrade, not here.
if (statusCode >= 100 && statusCode < 200 && statusCode !== 101) {
this.emit("information", {
statusCode,
statusMessage,
httpVersion: `${vMaj}.${vMin}`,
headers: buildHeaders(headers),
rawHeaders: headers,
});
return 1; // skip body, parser stays active for next response
}

// Upgrade (101 Switching Protocols) or CONNECT tunnel (200) —
// surface the live socket and stop HTTP parsing.
// The emit is deferred: we store the upgrade info and return 2 (skip body).
// The "data" handler captures leftover bytes via chunk.slice(ret) after
// parser.execute() returns, so we don't lose pipelined data.
if (upgrade || (method === "CONNECT" && statusCode === 200)) {
upgraded = true;
const builtHeaders = buildHeaders(headers);

const prevIsHTTPS = getIsNextIncomingMessageHTTPS();
setIsNextIncomingMessageHTTPS(protocol === "https:");
const upgradeRes = new IncomingMessage(null, {});
setIsNextIncomingMessageHTTPS(prevIsHTTPS);

upgradeRes[statusCodeSymbol] = statusCode;
upgradeRes[statusMessageSymbol] = statusMessage;
upgradeRes.httpVersion = `${vMaj}.${vMin}`;
upgradeRes.headers = builtHeaders;
upgradeRes.rawHeaders = headers;
upgradeRes.socket = socket;

// Store for deferred emit in the "data" handler
pendingUpgrade = { res: upgradeRes, event: upgrade ? "upgrade" : "connect" };
safelyFreeParser();
socket.removeListener("data", onData);
socket.removeListener("end", onEnd);
return 1; // skip body
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

const prevIsHTTPS = getIsNextIncomingMessageHTTPS();
setIsNextIncomingMessageHTTPS(protocol === "https:");
res = new IncomingMessage(null, {});
setIsNextIncomingMessageHTTPS(prevIsHTTPS);

res[statusCodeSymbol] = statusCode;
res[statusMessageSymbol] = statusMessage;
res.httpVersion = `${vMaj}.${vMin}`;
res.headers = buildHeaders(headers);
res.rawHeaders = headers;
res[bodyStreamSymbol] = true; // Prevent _read from accessing fetch APIs
res.socket = socket;
this.res = res;
res.req = this;
this[kClearTimeout]();

if (this.aborted) {
maybeEmitClose();
return 1;
}
if (!this.emit("response", res)) res._dump();
maybeEmitClose();

// Return value: 0 = parse body, 1 = skip body (HEAD)
return method === "HEAD" ? 1 : 0;
};

parser[HTTPParser.kOnBody] = chunk => {
if (res && !res._dumped) res.push(chunk);
};

parser[HTTPParser.kOnMessageComplete] = () => {
// For 1xx informational responses, res is not set — don't free the parser,
// just let it continue parsing the next (final) response.
if (!res) return;

// Handle trailing headers — override the noop prototype getter/setter
if (parser._headers.length) {
const trailers = buildHeaders(parser._headers);
const rawTrailers = parser._headers.slice();
Object.defineProperty(res, "trailers", {
value: trailers,
writable: true,
enumerable: true,
configurable: true,
});
Object.defineProperty(res, "rawTrailers", {
value: rawTrailers,
writable: true,
enumerable: true,
configurable: true,
});
parser._headers = [];
}
responseComplete();
};

parser[HTTPParser.kOnHeaders] = headers => {
// Accumulate trailing headers (called when headers arrive in fragments or as trailers)
parser._headers.push(...headers);
};

// Named handlers so they can be removed after upgrade to prevent use-after-free.
const onData = chunk => {
if (parserFreed) {
// After upgrade, if there's a pending upgrade emit, capture leftover bytes
// from the same TCP segment and emit the upgrade event now.
if (pendingUpgrade) {
const { res: upgradeRes, event } = pendingUpgrade;
pendingUpgrade = null;
this.emit(event, upgradeRes, socket, chunk);
}
return;
}
const ret = parser.execute(chunk);
if (ret instanceof Error) {
socket.destroy();
this.emit("error", ret);
return;
}
// After parser.execute(), check if an upgrade was detected. If so,
// emit the upgrade event with any leftover bytes from this chunk.
if (pendingUpgrade) {
const { res: upgradeRes, event } = pendingUpgrade;
pendingUpgrade = null;
const head = typeof ret === "number" && ret < chunk.length ? chunk.slice(ret) : Buffer.alloc(0);
this.emit(event, upgradeRes, socket, head);
}
};
const onEnd = () => {
if (parserFreed) return;
parser.finish();
// If the response is still incomplete after parser.finish(), the connection
// was closed prematurely — surface an error instead of silently completing.
if (res && !res.complete) {
res.destroy(new ConnResetException("aborted"));
}
};
socket.on("data", onData);
socket.on("error", err => {
if (isAbortError(err)) return;
try {
this.emit("error", err);
} catch (e) {
if (!!$debug) globalReportError(e);
}
});
socket.on("end", onEnd);
socket.on("close", () => {
// Handle premature close — but not after a successful upgrade,
// where res is null and the socket close is expected.
if (res && !res.complete) {
res.destroy(new ConnResetException("aborted"));
} else if (!res && !upgraded) {
// EOF before headers — emit error on the request
this.emit("error", new ConnResetException("aborted"));
}
// Free parser resources on any close to avoid leaks
safelyFreeParser();
// Mark the request as closed/destroyed without calling socketCloseListener(),
// which would re-emit "close" on this.socket (the real socket) causing duplicates.
this.destroyed = true;
if (!this._closed) {
this._closed = true;
callCloseCallback(this);
this.emit("close");
}
});
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// Ensure socket is connected before writing.
// Use instanceof for deterministic TLS detection, matching http2.ts pattern.
const isTLSSocket = socket instanceof tls().TLSSocket;
let connected = !socket.connecting && !(isTLSSocket && socket.secureConnecting);
if (!connected) {
const connectEvent = isTLSSocket ? "secureConnect" : "connect";
socket.once(connectEvent, () => {
connected = true;
if (this.finished) writeRequest();
});
}

// Write request when both connected and body is ready.
// If end() was already called (this.finished), write immediately.
// Otherwise, the send() function will trigger writing via resolveNextChunk.
if (this.finished && connected) {
process.nextTick(writeRequest);
} else if (!this.finished) {
// Override resolveNextChunk so that when end() signals completion,
// we write the request to the socket.
const origResolve = resolveNextChunk;
resolveNextChunk = end => {
origResolve?.(end);
if (end && connected) writeRequest();
};
}

return true;
};

const startFetch = (customBody?) => {
if (fetching) {
return false;
}

// Socket-based path: when createConnection is provided, bypass the fetch
// infrastructure and use raw HTTP/1.1 over the user-provided socket.
if (typeof createConnection === "function") {
return startFetchViaSocket();
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

fetching = true;

const method = this[kMethod];
Expand Down Expand Up @@ -918,6 +1268,8 @@ function ClientRequest(input, options, cb) {
const { signal: _signal, ...optsWithoutSignal } = options;
this[kOptions] = optsWithoutSignal;

const createConnection = options.createConnection;

this._httpMessage = this;

process.nextTick(emitContinueAndSocketNT, this);
Expand Down
Loading