Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 75 additions & 15 deletions src/js/node/http2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ const {
validateNumber,
validateAbortSignal,
} = require("internal/validators");
const { kTimeout, getTimerDuration } = require("internal/timers");

let utcCache;

Expand Down Expand Up @@ -2028,6 +2029,10 @@ function assertSession(session) {
hideFromStack(assertSession);

function pushToStream(stream, data) {
// Activity on this stream refreshes its per-stream idle timer so a
// completed/in-flight stream does not prematurely fire 'timeout' while
// data is still being delivered.
stream._unrefTimer();
if (data && stream[bunHTTP2StreamStatus] & StreamState.Closed) {
if (!stream._readableState.ended) {
// closed, but not ended, so resume and push null to end the stream
Expand Down Expand Up @@ -2078,6 +2083,12 @@ function markStreamClosed(stream: Http2Stream) {

if ((status & StreamState.Closed) === 0) {
stream[bunHTTP2StreamStatus] = status | StreamState.Closed;
// Disarm the per-stream idle timer on every close transition (explicit
// close(), _destroy, and the natural END_STREAM completion that only
// calls markStreamClosed and may defer destroy). Mirrors Node's
// closeStream() clearing the timer regardless of the close path.
clearTimeout(stream[kTimeout]);
stream[kTimeout] = null;
publishStreamCloseChannel(stream);

markWritableDone(stream);
Expand All @@ -2097,6 +2108,8 @@ class Http2Stream extends Duplex {
[bunHTTP2StreamStatus]: number = 0;

rstCode: number | undefined = undefined;
timeout: number | undefined = undefined;
[kTimeout]: ReturnType<typeof setTimeout> | null = null;
[bunHTTP2Headers]: any;
[kInfoHeaders]: any;
#sentTrailers: any;
Expand Down Expand Up @@ -2202,10 +2215,39 @@ class Http2Stream extends Duplex {
this.#sentTrailers = headers;
}

setTimeout(timeout, callback) {
const session = this[bunHTTP2Session];
if (!session) return;
session.setTimeout(timeout, callback);
setTimeout(msecs, callback) {
if (this.destroyed) return this;

this.timeout = msecs;

msecs = getTimerDuration(msecs, "msecs");
Comment thread
robobun marked this conversation as resolved.

clearTimeout(this[kTimeout]);

if (msecs === 0) {
if (callback !== undefined) {
validateFunction(callback, "callback");
this.removeListener("timeout", callback);
}
} else {
this[kTimeout] = setTimeout(this._onTimeout.bind(this), msecs).unref();
Comment thread
robobun marked this conversation as resolved.

if (callback !== undefined) {
validateFunction(callback, "callback");
this.once("timeout", callback);
}
}
return this;
}

_onTimeout() {
if (this.destroyed) return;
this.emit("timeout");
}
Comment thread
robobun marked this conversation as resolved.

_unrefTimer() {
const timer = this[kTimeout];
if (timer) timer.refresh();
}
Comment thread
robobun marked this conversation as resolved.
Comment thread
robobun marked this conversation as resolved.

get closed() {
Expand Down Expand Up @@ -2489,6 +2531,7 @@ class Http2Stream extends Duplex {
this.once("ready", this._writev.bind(this, data, callback));
return;
}
this._unrefTimer();
const session = this[bunHTTP2Session];
if (session) {
const native = session[bunHTTP2Native];
Expand Down Expand Up @@ -2532,6 +2575,7 @@ class Http2Stream extends Duplex {
this.once("ready", this._write.bind(this, chunk, encoding, callback));
return;
}
this._unrefTimer();
const session = this[bunHTTP2Session];
if (session) {
const native = session[bunHTTP2Native];
Expand Down Expand Up @@ -2745,6 +2789,10 @@ class ServerHttp2Stream extends Http2Stream {
if (!parser) {
throw $ERR_HTTP2_INVALID_STREAM();
}
// Sending the PUSH_PROMISE HEADERS frame is outbound activity on the
// parent stream; refresh its per-stream idle timer (mirrors Node's
// kUpdateTimer in pushStream()).
this._unrefTimer();
headers = { ...headers };
assertNoConnectionHeaders(headers);
const sensitives = headers[sensitiveHeaders];
Expand Down Expand Up @@ -2921,6 +2969,11 @@ class ServerHttp2Stream extends Http2Stream {
throw $ERR_HTTP2_HEADERS_AFTER_RESPOND();
}

// Sending a 1xx informational HEADERS frame is outbound stream activity;
// refresh the per-stream idle timer (mirrors Node's kUpdateTimer in
// additionalHeaders()).
this._unrefTimer();

if (headers == undefined) {
headers = {};
} else if (!$isObject(headers) || $isArray(headers)) {
Expand Down Expand Up @@ -2990,6 +3043,10 @@ class ServerHttp2Stream extends Http2Stream {
throw $ERR_HTTP2_TRAILERS_ALREADY_SENT();
}

// Sending the response HEADERS frame is outbound stream activity; refresh
// the per-stream idle timer (mirrors Node's kUpdateTimer in respond()).
this._unrefTimer();
Comment thread
robobun marked this conversation as resolved.

// Raw (flat [name, value, ...] array) headers form: the pairs are encoded
// on the wire in their given order; a default :status is prepended and a
// date header appended when missing. The derived object form (original-case
Expand Down Expand Up @@ -3546,6 +3603,8 @@ class ServerHttp2Session extends Http2Session {
flags: number,
) {
if (!self || typeof stream !== "object" || self.closed || stream.closed) return;
// A HEADERS frame is stream activity; refresh the per-stream idle timer.
stream._unrefTimer();
let rawheaders = headersTuple[0];
let headers = headersTuple[1];
if (self.#strictFieldWhitespaceValidation) {
Expand Down Expand Up @@ -3705,10 +3764,11 @@ class ServerHttp2Session extends Http2Session {
this.destroy(error);
}
#onTimeout() {
const parser = this.#parser;
if (parser) {
parser.forEachStream(emitTimeout);
}
// Per Node.js http2 semantics, a session-level (socket) idle timeout
// emits 'timeout' on the session ONLY. Individual Http2Streams each
// manage their own per-stream idle timers via Http2Stream.setTimeout.
// Do NOT broadcast to every stream here — that would surface spurious
// timeouts on completed/in-flight streams after a short socket idle.
this.emit("timeout");
}
#onDrain() {
Expand Down Expand Up @@ -4072,9 +4132,6 @@ class ServerHttp2Session extends Http2Session {
process.nextTick(emitEventNT, this, "close");
}
}
function emitTimeout(session: ClientHttp2Session) {
session.emit("timeout");
}
function destroySelfOnEnd(this: Http2Stream) {
this.destroy();
}
Expand Down Expand Up @@ -4275,6 +4332,8 @@ class ClientHttp2Session extends Http2Session {
flags: number,
) {
if (!self || typeof stream !== "object" || stream.rstCode) return;
// A HEADERS frame is stream activity; refresh the per-stream idle timer.
stream._unrefTimer();
let rawheaders = headersTuple[0];
let headers = headersTuple[1];
if (self.#strictFieldWhitespaceValidation) {
Expand Down Expand Up @@ -4520,10 +4579,11 @@ class ClientHttp2Session extends Http2Session {
this.destroy(error);
}
#onTimeout() {
const parser = this.#parser;
if (parser) {
parser.forEachStream(emitTimeout);
}
// Per Node.js http2 semantics, a session-level (socket) idle timeout
// emits 'timeout' on the session ONLY. Individual Http2Streams each
// manage their own per-stream idle timers via Http2Stream.setTimeout.
// Do NOT broadcast to every stream here — that would surface spurious
// timeouts on completed/in-flight streams after a short socket idle.
this.emit("timeout");
}
#onDrain() {
Expand Down
162 changes: 162 additions & 0 deletions test/regression/issue/30307.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// https://github.com/oven-sh/bun/issues/30307

import { describe, expect, it } from "bun:test";
import { isASAN } from "harness";
import { once } from "node:events";
import http2 from "node:http2";

// The ASAN lane is measurably slower than release; scale per-stream
// thresholds so a transient stall on a loaded CI box can't trip a
// correctly-cleared timer during the request.
const SCALE = isASAN ? 4 : 1;

describe("#30307", () => {
it("req.setTimeout does not fire on completed streams after a session-idle gap", async () => {
const server = http2.createServer();
server.on("stream", stream => {
stream.respond({ ":status": 200 });
stream.end("ok");
});

await new Promise<void>(resolve => server.listen(0, resolve));
const port = (server.address() as import("node:net").AddressInfo).port;
const client = http2.connect(`http://localhost:${port}`);
try {
// Warm the session so subsequent requests return quickly and never
// approach the per-stream timeout during their own lifetime.
await once(client, "connect");
{
const warmup = client.request({ ":path": "/" });
warmup.resume();
warmup.end();
await once(warmup, "end");
}

// The per-stream threshold must elapse during the idle gap below.
// The gap is driven by the session's own socket idle-timeout
// firing deterministically, so the timing is event-based.
const STREAM_TIMEOUT_MS = 150 * SCALE;
const IDLE_BARRIER_MS = 2 * STREAM_TIMEOUT_MS;

const timeoutFires: string[] = [];
async function doRequest(label: string) {
const req = client.request({ ":path": "/" });
req.setTimeout(STREAM_TIMEOUT_MS, () => {
timeoutFires.push(label);
});
req.resume();
req.end();
// once() rejects on 'error', so a stream failure surfaces at this
// await rather than as an uncaught emitter throw.
await once(req, "end");
}

await doRequest("req-1");
await doRequest("req-2");
await doRequest("req-3");
await doRequest("req-4");

// Arm a session-level socket idle timeout as a deterministic
// barrier: wait for the session's own 'timeout' event rather than
// sleeping a fixed duration. By the time this fires, the per-stream
// threshold has elapsed for every completed stream above. On the
// buggy path the accumulated per-stream callbacks on the shared
// socket (or the session→streams cascade for completed-but-still-
// tracked streams) fire first and populate timeoutFires.
client.setTimeout(IDLE_BARRIER_MS);
await once(client, "timeout");

// A follow-up request that also succeeds silently.
await doRequest("req-5");

expect(timeoutFires).toEqual([]);
} finally {
client.close();
server.close();
}
});

it("session-level setTimeout does not emit 'timeout' on live streams", async () => {
// A server that never responds keeps client streams open, so they're
// still tracked by the session when the socket idle timer fires.
// On the buggy path, the session #onTimeout did
// `parser.forEachStream(emitTimeout)` and every live stream saw a
// spurious 'timeout' event. Per Node.js, session idle timeouts emit
// on the session only.
const server = http2.createServer();
server.on("stream", _stream => {
// deliberately no response
});

await new Promise<void>(resolve => server.listen(0, resolve));
const port = (server.address() as import("node:net").AddressInfo).port;
const client = http2.connect(`http://localhost:${port}`);
try {
await once(client, "connect");

const streamFired: string[] = [];
const req1 = client.request({ ":path": "/a" });
const req2 = client.request({ ":path": "/b" });
// Swallow the inevitable ERR_HTTP2_STREAM_ERROR on teardown so it
// doesn't surface as an uncaught stream error.
req1.on("error", () => {});
req2.on("error", () => {});
req1.on("timeout", () => streamFired.push("req1"));
req2.on("timeout", () => streamFired.push("req2"));
req1.end();
req2.end();

// Both streams are live (waiting for a response that never comes).
// Arm the session socket idle timer and wait deterministically for
// its 'timeout' event. No per-stream 'timeout' must fire.
client.setTimeout(150 * SCALE);
await once(client, "timeout");

expect(streamFired).toEqual([]);

req1.close(http2.constants.NGHTTP2_CANCEL);
req2.close(http2.constants.NGHTTP2_CANCEL);
} finally {
client.close();
server.close();
}
});

it("req.setTimeout does not fire on a completed stream whose body is never read", async () => {
// A clean END_STREAM response with a buffered, never-consumed body takes
// the client's deferred-destroy path: streamEnd calls markStreamClosed but
// waits for the reader to drain before destroying, so _destroy may never
// run. The per-stream timer must still be disarmed at the close
// transition (markStreamClosed), not only in _destroy.
const server = http2.createServer();
server.on("stream", stream => {
stream.respond({ ":status": 200 });
stream.end("a response body the client never reads");
});

await new Promise<void>(resolve => server.listen(0, resolve));
const port = (server.address() as import("node:net").AddressInfo).port;
const client = http2.connect(`http://localhost:${port}`);
try {
await once(client, "connect");

const fired: string[] = [];
const req = client.request({ ":path": "/" });
req.on("error", () => {});
req.setTimeout(150 * SCALE, () => fired.push("req"));
req.end();
// Deliberately never resume()/read the body: the response stays buffered
// and the stream's _destroy is deferred until a consumer drains it.

// Barrier: a session idle-timeout at 2x the per-stream timeout. If the
// stream timer were left armed it would fire (at 1x) well before this.
client.setTimeout(2 * 150 * SCALE);
await once(client, "timeout");

expect(fired).toEqual([]);
} finally {
client.close();
server.close();
}
});
});
Loading