From d5aef8b0f4a82d2093c2a2257d4d5535cd18e2af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Mon, 18 May 2026 02:53:56 +0100 Subject: [PATCH] fix(server): plug listener leaks on Hono SSE routes via abort bridge MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The invalidation-event streams were fixed in #833: bind cleanup to the per-request AbortSignal so abnormal disconnects (LB idle timeout, proxy kill, client hard close) actually tear the stream down. Two more SSE routes share the same retain shape and were missed: - packages/server/src/gateway/routes/public/agent.ts — the agent SSE channel at /api/v1/agents/:agentId/events - packages/server/src/gateway/gateway/index.ts — the worker SSE channel at /worker/stream Both use Hono's streamSSE/stream helpers. Looking at node_modules/hono/dist/helper/streaming/sse.js, the only place Hono wires signal.abort to stream.abort is inside an isOldBunVersion() branch — on Node + current Bun, only ReadableStream.cancel() triggers the onAbort subscribers. Under abnormal disconnect the cancel path never runs, so: - the 30s heartbeat setInterval keeps firing forever - SseManager keeps the dead connection in its per-agent Set - the `while !stream.aborted` loop runs forever - WorkerConnectionManager keeps the dead writer until the 10-minute stale-cleanup sweeps it (and the loop closure leaks until then) All three retain the closure over the dead stream — the same leak shape #833 fixed for invalidation streams. Credible contributor to the OOMKilled-at-1Gi event tracked in #782. Fix --- New helper `events/sse-abort-bridge.ts` centralizes the bridge so the pattern doesn't drift again. Both routes now: 1. Bail early if requestSignal.aborted is already true at handler entry. 2. Wire stream.onAbort to local cleanup (interval clear + map evict). 3. Bridge requestSignal -> stream.abort() via the helper, so onAbort subscribers run on abnormal disconnects too. 4. detach the signal listener in a finally so the loop exit path doesn't leak it. Reproducer ---------- Wrote a Hono-streamSSE integration test in src/__tests__/unit/sse-abort-bridge.test.ts that reproduces the leak: - Without the bridge (control run): after the per-request abort fires, the heartbeat setInterval is still active and the while-loop never exits within 500ms. - With the bridge: timers=0 and loopExited=true within ~25ms. The control run was kept locally and verified ("OLD reproduce: loopExited = false timers = 1") before being deleted. Test plan --------- - bun test src/__tests__/unit/sse-abort-bridge.test.ts -> 8/8 pass - bun test src/__tests__/unit/sse-cleanup.test.ts -> 5/5 pass (the existing #833 suite, unchanged) - bun run typecheck -> clean - make build-packages -> clean Refs #782. --- .../__tests__/unit/sse-abort-bridge.test.ts | 203 ++++++++++++++++++ .../server/src/events/sse-abort-bridge.ts | 64 ++++++ packages/server/src/gateway/gateway/index.ts | 33 ++- .../server/src/gateway/routes/public/agent.ts | 39 +++- 4 files changed, 331 insertions(+), 8 deletions(-) create mode 100644 packages/server/src/__tests__/unit/sse-abort-bridge.test.ts create mode 100644 packages/server/src/events/sse-abort-bridge.ts diff --git a/packages/server/src/__tests__/unit/sse-abort-bridge.test.ts b/packages/server/src/__tests__/unit/sse-abort-bridge.test.ts new file mode 100644 index 000000000..f22028949 --- /dev/null +++ b/packages/server/src/__tests__/unit/sse-abort-bridge.test.ts @@ -0,0 +1,203 @@ +/** + * Regression test for lobu-ai/lobu#782: the abort-bridge helper used by the + * Hono-streaming SSE routes (agent SSE channel + worker SSE channel) must + * tear the stream down when the per-request `AbortSignal` fires, even when + * `ReadableStream.cancel()` never runs. + * + * The helper is a thin pure-JS bridge — these tests exercise it directly + * against a stub `AbortableStream`, mirroring the contract documented in + * `events/sse-abort-bridge.ts`. + */ +import { describe, expect, it } from 'bun:test'; +import { + bindRequestAbortToStream, + type AbortableStream, +} from '../../events/sse-abort-bridge'; + +function fakeStream(): AbortableStream & { abortCalls: number } { + return { + aborted: false, + closed: false, + abortCalls: 0, + abort() { + this.abortCalls++; + (this as { aborted: boolean }).aborted = true; + }, + }; +} + +describe('bindRequestAbortToStream', () => { + it('calls stream.abort() when the request signal aborts', () => { + const ctrl = new AbortController(); + const stream = fakeStream(); + + const detach = bindRequestAbortToStream(ctrl.signal, stream); + expect(stream.abortCalls).toBe(0); + + ctrl.abort(); + expect(stream.abortCalls).toBe(1); + + // detach is safe to call after abort has fired. + detach(); + }); + + it('fires synchronously if the signal is already aborted at bind time', () => { + const ctrl = new AbortController(); + ctrl.abort(); + const stream = fakeStream(); + + bindRequestAbortToStream(ctrl.signal, stream); + + expect(stream.abortCalls).toBe(1); + }); + + it('detach() prevents the listener from firing later', () => { + const ctrl = new AbortController(); + const stream = fakeStream(); + + const detach = bindRequestAbortToStream(ctrl.signal, stream); + detach(); + + ctrl.abort(); + expect(stream.abortCalls).toBe(0); + }); + + it('detach() is idempotent', () => { + const ctrl = new AbortController(); + const stream = fakeStream(); + + const detach = bindRequestAbortToStream(ctrl.signal, stream); + detach(); + detach(); // must not throw + expect(stream.abortCalls).toBe(0); + }); + + it('does not call abort() if the stream already closed', () => { + const ctrl = new AbortController(); + const stream = fakeStream(); + (stream as { closed: boolean }).closed = true; + + bindRequestAbortToStream(ctrl.signal, stream); + ctrl.abort(); + + expect(stream.abortCalls).toBe(0); + }); + + it('does not call abort() if the stream already aborted', () => { + const ctrl = new AbortController(); + const stream = fakeStream(); + (stream as { aborted: boolean }).aborted = true; + + bindRequestAbortToStream(ctrl.signal, stream); + ctrl.abort(); + + expect(stream.abortCalls).toBe(0); + }); + + it('returns a no-op detach when no signal is provided', () => { + const stream = fakeStream(); + const detach = bindRequestAbortToStream(undefined, stream); + detach(); // must not throw + expect(stream.abortCalls).toBe(0); + }); +}); + +describe('bindRequestAbortToStream + Hono streamSSE (integration)', () => { + /** + * Reproduces the #782 leak shape end-to-end: + * 1. Mount a Hono route that uses `streamSSE` + a per-iteration heartbeat + * interval + a `while !aborted` loop. + * 2. Wire the abort bridge. + * 3. Hit the route and abort the request without consuming the body or + * cancelling the response stream. + * 4. Assert the heartbeat interval has been cleared and the loop has + * exited within a short bounded wait. + * + * Before the fix, the loop runs forever and the interval keeps firing — + * the test would time out the post-abort assertion. + */ + const originalSetInterval = globalThis.setInterval; + const originalClearInterval = globalThis.clearInterval; + + it('tears down heartbeat + loop on request abort', async () => { + const { Hono } = await import('hono'); + const { streamSSE } = await import('hono/streaming'); + + const activeTimers = new Set(); + globalThis.setInterval = ((fn: () => void, ms: number) => { + const handle = originalSetInterval(fn, ms); + activeTimers.add(handle); + return handle; + }) as typeof setInterval; + globalThis.clearInterval = ((handle: unknown) => { + activeTimers.delete(handle); + return originalClearInterval( + handle as Parameters[0] + ); + }) as typeof clearInterval; + + try { + let loopExited = false; + const app = new Hono(); + app.get('/sse', (c) => + streamSSE(c, async (stream) => { + if (c.req.raw.signal?.aborted) return; + + const heartbeat = setInterval(() => { + stream.writeSSE({ event: 'ping', data: 'x' }).catch(() => {}); + }, 50); + + const detach = bindRequestAbortToStream(c.req.raw.signal, stream); + try { + await stream.writeSSE({ event: 'connected', data: '{}' }); + while (!stream.aborted && !stream.closed) { + await stream.sleep(25); + } + } finally { + clearInterval(heartbeat); + detach(); + loopExited = true; + } + }) + ); + + const ctrl = new AbortController(); + const req = new Request('http://localhost/sse', { + method: 'GET', + signal: ctrl.signal, + }); + + // Fire the request without awaiting the body — we want to abort while + // the handler is mid-loop. + const respPromise = app.fetch(req); + // Give the handler time to register interval + write connected event. + await new Promise((r) => setTimeout(r, 75)); + + expect(activeTimers.size).toBeGreaterThanOrEqual(1); + + ctrl.abort(); + + // Wait for the loop to exit (max 500ms — sleep granularity is 25ms). + const deadline = Date.now() + 500; + while (!loopExited && Date.now() < deadline) { + await new Promise((r) => setTimeout(r, 10)); + } + expect(loopExited).toBe(true); + expect(activeTimers.size).toBe(0); + + // Drain the response so we don't leak the body. + try { + const resp = await Promise.resolve(respPromise); + await resp.body?.cancel(); + } catch { + /* ignore */ + } + } finally { + globalThis.setInterval = originalSetInterval; + globalThis.clearInterval = originalClearInterval; + for (const handle of activeTimers) { + originalClearInterval(handle as Parameters[0]); + } + } + }); +}); diff --git a/packages/server/src/events/sse-abort-bridge.ts b/packages/server/src/events/sse-abort-bridge.ts new file mode 100644 index 000000000..4036d9502 --- /dev/null +++ b/packages/server/src/events/sse-abort-bridge.ts @@ -0,0 +1,64 @@ +/** + * `bindRequestAbortToStream` — bridge a Hono per-request `AbortSignal` to a + * Hono `StreamingApi` so abnormal disconnects (LB idle timeout, proxy kill, + * client hard close) actually tear the stream down. + * + * Hono's `streamSSE` / `stream` helpers only invoke `streamWriter.onAbort` + * subscribers when the underlying `ReadableStream.cancel()` runs. On + * Node + current Bun, `cancel()` doesn't fire for abnormal disconnects — + * which leaves any heartbeat `setInterval`, registered listener, or + * `while !aborted` loop running forever. Issue lobu-ai/lobu#782. + * + * The invalidation-event streams already use a raw `ReadableStream` with the + * same bridge wired inline (`events/sse.ts`, fixed in PR #833). The two + * remaining Hono-streaming routes — the agent SSE channel at + * `/api/v1/agents/:agentId/events` and the worker SSE channel at + * `/worker/stream` — need the same bridge. This helper centralizes it so + * the pattern doesn't drift again the next time we add an SSE route. + * + * Caller contract: + * const detach = bindRequestAbortToStream(c.req.raw.signal, stream); + * try { ... } finally { detach(); } + * + * `detach()` is idempotent. The function returns immediately (and `detach` + * is a no-op) if the signal is already aborted — the caller is expected to + * check `signal.aborted` and bail before doing real work. + */ + +/** Minimal shape we need off Hono's `StreamingApi` / `SSEStreamingApi`. */ +export interface AbortableStream { + readonly aborted: boolean; + readonly closed: boolean; + abort(): void; +} + +export function bindRequestAbortToStream( + signal: AbortSignal | undefined, + stream: AbortableStream +): () => void { + if (!signal) return () => {}; + + const onAbort = () => { + if (!stream.aborted && !stream.closed) { + stream.abort(); + } + }; + + // If already aborted, fire synchronously so the caller's loop sees it on + // the next check. addEventListener with `once: true` would also fire if + // re-dispatched, but the platform doesn't re-dispatch already-aborted + // signals — so we have to do it ourselves. + if (signal.aborted) { + onAbort(); + return () => {}; + } + + signal.addEventListener('abort', onAbort, { once: true }); + + let detached = false; + return () => { + if (detached) return; + detached = true; + signal.removeEventListener('abort', onAbort); + }; +} diff --git a/packages/server/src/gateway/gateway/index.ts b/packages/server/src/gateway/gateway/index.ts index 842705915..a197d69cc 100644 --- a/packages/server/src/gateway/gateway/index.ts +++ b/packages/server/src/gateway/gateway/index.ts @@ -9,6 +9,7 @@ import { createLogger, encrypt, verifyWorkerToken } from "@lobu/core"; import type { Context } from "hono"; import { Hono } from "hono"; import { stream } from "hono/streaming"; +import { bindRequestAbortToStream } from "../../events/sse-abort-bridge.js"; import type { ApiKeyProviderModule } from "../auth/api-key-provider-module.js"; import { getRevokedTokenStore } from "../auth/revoked-token-store.js"; import type { McpConfigService } from "../auth/mcp/config-service.js"; @@ -189,10 +190,27 @@ export class WorkerGateway { const httpPortParam = c.req.query("httpPort"); const httpPort = httpPortParam ? parseInt(httpPortParam, 10) : undefined; - // Create an SSE stream + // Create an SSE stream. + // + // Hono's `stream()` only fires `streamWriter.onAbort()` from + // `ReadableStream.cancel()` — which doesn't run on abnormal disconnects + // (LB idle timeout, intermediate proxy kill, worker pod hard exit). On + // Node + current Bun the per-request `AbortSignal` is the only reliable + // trigger. Without bridging it, a stale worker SSE leaks the writer + // closure + `while !isClosed` loop until the 10-minute stale-cleanup + // sweep catches up. Same retain pattern fixed for the invalidation + // streams in #833. Refs #782. + const requestSignal = c.req.raw.signal; + return stream(c, async (streamWriter) => { let isClosed = false; + // If the client already aborted between handler invocation and stream + // body execution, bail out before registering anything. + if (requestSignal?.aborted) { + return; + } + // Create an SSE writer adapter const sseWriter: SSEWriter = { write: (data: string): boolean => { @@ -218,6 +236,11 @@ export class WorkerGateway { }, }; + const detachAbortBridge = bindRequestAbortToStream( + requestSignal, + streamWriter + ); + // Set SSE headers c.header("Content-Type", "text/event-stream"); c.header("Cache-Control", "no-cache"); @@ -269,8 +292,12 @@ export class WorkerGateway { }); // Keep the connection open until the stream is actually aborted. - while (!isClosed) { - await streamWriter.sleep(1000); + try { + while (!isClosed) { + await streamWriter.sleep(1000); + } + } finally { + detachAbortBridge(); } }); } diff --git a/packages/server/src/gateway/routes/public/agent.ts b/packages/server/src/gateway/routes/public/agent.ts index c30f276b4..127dde8fc 100644 --- a/packages/server/src/gateway/routes/public/agent.ts +++ b/packages/server/src/gateway/routes/public/agent.ts @@ -13,6 +13,7 @@ import { } from "@lobu/core"; import type { Context } from "hono"; import { streamSSE } from "hono/streaming"; +import { bindRequestAbortToStream } from "../../../events/sse-abort-bridge.js"; import { z } from "zod"; import type { AgentMetadataStore } from "../../auth/agent-metadata-store.js"; import { getRevokedTokenStore } from "../../auth/revoked-token-store.js"; @@ -869,8 +870,25 @@ export function createAgentApi(config: AgentApiConfig): OpenAPIHono { ); } - // Return SSE stream + // Return SSE stream. + // + // Hono's `streamSSE` only fires `stream.onAbort()` when the underlying + // `ReadableStream.cancel()` runs — which doesn't happen on abnormal + // disconnects (LB idle timeout, intermediate proxy kill, client hard + // close). On Node + current Bun, `signal.abort` is the only reliable + // trigger. Without it the heartbeat interval keeps firing, the + // `sseManager` retains the dead connection, and the `while !aborted` + // loop never exits — the same retain pattern fixed for the invalidation + // streams in #833. Refs #782. + const requestSignal = c.req.raw.signal; + return streamSSE(c, async (stream) => { + // If the client already aborted between handler invocation and stream + // body execution, bail out before registering anything. + if (requestSignal?.aborted) { + return; + } + sseManager.addConnection(sseKey, stream); await stream.writeSSE({ @@ -899,14 +917,25 @@ export function createAgentApi(config: AgentApiConfig): OpenAPIHono { } }, 30000); - stream.onAbort(() => { + let cleanedUp = false; + const cleanup = () => { + if (cleanedUp) return; + cleanedUp = true; clearInterval(heartbeatInterval); sseManager.removeConnection(sseKey, stream); logger.info(`SSE connection closed for session ${sseKey}`); - }); + }; + + stream.onAbort(cleanup); + const detachAbortBridge = bindRequestAbortToStream(requestSignal, stream); - while (true) { - await stream.sleep(1000); + try { + while (!stream.aborted && !stream.closed) { + await stream.sleep(1000); + } + } finally { + detachAbortBridge(); + cleanup(); } }); });