Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 191 additions & 0 deletions src/js/node/_http_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,13 @@ const {
reqSymbol,
callCloseCallback,
emitCloseNTAndComplete,
bodyStreamSymbol,
statusCodeSymbol,
statusMessageSymbol,
} = require("internal/http");

const { HTTPParser, freeParser } = require("node:_http_common");

const { globalAgent } = require("node:_http_agent");
const { IncomingMessage } = require("node:_http_incoming");
const { OutgoingMessage } = require("node:_http_outgoing");
Expand Down Expand Up @@ -266,11 +271,195 @@ function ClientRequest(input, options, cb) {

let fetching = false;

const startFetchViaSocket = () => {
fetching = true;

const method = this[kMethod];
const path = this[kPath];
const host = this[kHost];
const port = this[kPort];
const protocol = this[kProtocol];
const socketPath = this[kSocketPath];

// Pass full options to createConnection, matching Node.js behavior.
// The options object is compatible with net.connect() / tls.connect().
const connectOptions = { ...options, host, port, path: socketPath || undefined };

let socket;
try {
socket = createConnection(connectOptions);
} catch (err) {
process.nextTick((self, e) => self.emit("error", e), this, err);
return false;
}

this.socket = socket;
this[kAbortController]?.signal?.addEventListener("abort", () => socket.destroy(), { once: true });

// --- Write HTTP/1.1 request ---
const writeRequest = () => {
const headers = this.getHeaders();
let head = `${method} ${path} HTTP/1.1\r\n`;

if (!headers.host && !headers.Host) {
const dp = protocol === "https:" ? 443 : 80;
const hostStr = isIPv6(host) ? `[${host}]` : host;
head += port && port !== dp ? `Host: ${hostStr}:${port}\r\n` : `Host: ${hostStr}\r\n`;
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

const chunks = this[kBodyChunks];
let body;
if (chunks?.length > 0) {
const bufs = chunks.map(c => (typeof c === "string" ? Buffer.from(c) : c));
body = bufs.length === 1 ? bufs[0] : Buffer.concat(bufs);
}

for (const key of Object.keys(headers)) {
const val = headers[key];
if (val === undefined) continue;
if ($isJSArray(val)) {
for (const v of val) head += `${key}: ${v}\r\n`;
} else {
head += `${key}: ${val}\r\n`;
}
}

if (body) head += `Content-Length: ${body.byteLength}\r\n`;
head += "\r\n";
socket.write(head);
if (body) socket.write(body);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
};

// --- Parse HTTP/1.1 response using llhttp (via HTTPParser) ---
const parser = new HTTPParser();
parser._headers = [];
parser._url = "";
parser.maxHeaderPairs = 2000;
parser.socket = socket;
parser.initialize(HTTPParser.RESPONSE, {});

let res: any = null;

const responseComplete = () => {
if (res && !res.complete) {
res.push(null);
res.complete = true;
}
this[kClearTimeout]();
fetching = false;
this[kFetchRequest] = null;
freeParser(parser, this, socket);
maybeEmitClose();
};

// Build headers object from the flat [key, val, key, val, ...] array llhttp produces
const buildHeaders = (rawHeaders: string[]) => {
const headers: any = Object.create(null);
for (let i = 0; i < rawHeaders.length; i += 2) {
const lk = rawHeaders[i].toLowerCase();
const v = rawHeaders[i + 1];
if (lk === "set-cookie") headers[lk] = headers[lk] ? [...headers[lk], v] : [v];
else headers[lk] = v;
}
return headers;
};

parser[HTTPParser.kOnHeadersComplete] = (vMaj, vMin, headers, _method, _url, statusCode, statusMessage, upgrade, shouldKeepAlive) => {
if (headers === undefined) {
headers = parser._headers;
parser._headers = [];
}

const prevIsHTTPS = getIsNextIncomingMessageHTTPS();
setIsNextIncomingMessageHTTPS(protocol === "https:");
res = new IncomingMessage(null, {});
setIsNextIncomingMessageHTTPS(prevIsHTTPS);

res[statusCodeSymbol] = statusCode;
res[statusMessageSymbol] = statusMessage;
res.httpVersion = `${vMaj}.${vMin}`;
res.headers = buildHeaders(headers);
res.rawHeaders = headers;
res[bodyStreamSymbol] = true; // Prevent _read from accessing fetch APIs
res.socket = socket;
this.res = res;
res.req = this;
this[kClearTimeout]();

if (this.aborted) { maybeEmitClose(); return 2; }
if (!this.emit("response", res)) res._dump();
maybeEmitClose();

// Return value: 0 = parse body, 1 = skip body (HEAD)
return method === "HEAD" ? 1 : 0;
};

parser[HTTPParser.kOnBody] = (chunk) => {
if (res && !res._dumped) res.push(chunk);
};

parser[HTTPParser.kOnMessageComplete] = () => {
// Handle trailing headers — override the noop prototype getter/setter
if (parser._headers.length && res) {
const trailers = buildHeaders(parser._headers);
const rawTrailers = parser._headers.slice();
Object.defineProperty(res, "trailers", { value: trailers, writable: true, enumerable: true, configurable: true });
Object.defineProperty(res, "rawTrailers", { value: rawTrailers, writable: true, enumerable: true, configurable: true });
parser._headers = [];
}
responseComplete();
};
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

parser[HTTPParser.kOnHeaders] = (headers) => {
// Accumulate trailing headers (called when headers arrive in fragments or as trailers)
parser._headers.push(...headers);
};

socket.on("data", (chunk) => {
const ret = parser.execute(chunk);
if (ret instanceof Error) {
socket.destroy();
this.emit("error", ret);
}
});
socket.on("error", (err) => {
if (isAbortError(err)) return;
try { this.emit("error", err); } catch (e) {
if (!!$debug) globalReportError(e);
}
});
socket.on("end", () => {
parser.finish();
if (res && !res.complete) responseComplete();
});
socket.on("close", () => {
if (res && !res.complete) responseComplete();
socketCloseListener();
});
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// Wait for connection (TCP or TLS handshake) before writing, matching http2 pattern
if (socket.connecting || socket.secureConnecting) {
const connectEvent = socket.secureConnecting ? "secureConnect" : "connect";
socket.once(connectEvent, writeRequest);
} else {
process.nextTick(writeRequest);
}

return true;
};

const startFetch = (customBody?) => {
if (fetching) {
return false;
}

// Socket-based path: when createConnection is provided, bypass the fetch
// infrastructure and use raw HTTP/1.1 over the user-provided socket.
if (typeof createConnection === "function") {
if (!this.finished) return false; // Wait until end() is called
return startFetchViaSocket();
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

fetching = true;

const method = this[kMethod];
Expand Down Expand Up @@ -918,6 +1107,8 @@ function ClientRequest(input, options, cb) {
const { signal: _signal, ...optsWithoutSignal } = options;
this[kOptions] = optsWithoutSignal;

const createConnection = options.createConnection;

this._httpMessage = this;

process.nextTick(emitContinueAndSocketNT, this);
Expand Down
Loading