From 2019adea1e15a5c49c9fa861a53d23df1ac02e73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Mon, 18 May 2026 00:49:30 +0100 Subject: [PATCH 1/2] fix(server): tear down SSE keepalive + listener on abnormal disconnect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Both invalidation event streams (`/api/:orgSlug/events` and `/api/:orgSlug/public/events`) bound cleanup only to `ReadableStream.cancel()`. Under abnormal disconnects — LB timeout, proxy kill, client hard close — `cancel()` does not fire, leaving the 30s keepalive `setInterval` running and the emitter listener registered. Each stale connection retains a closure over the controller, slowly leaking memory and file descriptors. Wire cleanup to `c.req.raw.signal` (Hono's per-request `AbortSignal`), which fires on socket close regardless of stream-cancel semantics. Keep `cancel()` as a redundant trigger; both route through an idempotent `runCleanup()` so the second call is a no-op. Extract a small `streamInvalidationEvents` helper since the pattern was duplicated in two places. Refs #782. --- .../src/__tests__/unit/sse-cleanup.test.ts | 132 ++++++++++++++++++ packages/server/src/events/sse.ts | 110 +++++++++++++++ packages/server/src/index.ts | 41 +----- packages/server/src/rest-api.ts | 46 +----- 4 files changed, 250 insertions(+), 79 deletions(-) create mode 100644 packages/server/src/__tests__/unit/sse-cleanup.test.ts create mode 100644 packages/server/src/events/sse.ts diff --git a/packages/server/src/__tests__/unit/sse-cleanup.test.ts b/packages/server/src/__tests__/unit/sse-cleanup.test.ts new file mode 100644 index 000000000..1f9a23a61 --- /dev/null +++ b/packages/server/src/__tests__/unit/sse-cleanup.test.ts @@ -0,0 +1,132 @@ +/** + * Regression test for lobu-ai/lobu#782: SSE keepalive timer + emitter listener + * must be torn down when the underlying request aborts, not only when the + * `ReadableStream.cancel()` callback fires. + * + * Reproduces the abnormal-disconnect path by: + * 1. Wiring an AbortController as `c.req.raw.signal`. + * 2. Letting the stream start (registers the listener + interval). + * 3. Aborting without ever consuming or cancelling the stream. + * 4. Asserting the listener stops receiving events and the interval is cleared. + */ +import { afterEach, beforeEach, describe, expect, it } from 'bun:test'; +import * as invalidationEmitter from '../../events/emitter'; +import { streamInvalidationEvents } from '../../events/sse'; + +function buildContext(signal: AbortSignal) { + return { + req: { raw: { signal } }, + header: () => {}, + body: (stream: ReadableStream) => new Response(stream), + }; +} + +describe('streamInvalidationEvents cleanup', () => { + const originalSetInterval = globalThis.setInterval; + const originalClearInterval = globalThis.clearInterval; + const activeTimers = new Set(); + + beforeEach(() => { + activeTimers.clear(); + globalThis.setInterval = ((fn: () => void, ms: number) => { + const handle = originalSetInterval(fn, ms); + activeTimers.add(handle); + return handle; + }) as typeof setInterval; + globalThis.clearInterval = ((handle: unknown) => { + activeTimers.delete(handle); + return originalClearInterval(handle as Parameters[0]); + }) as typeof clearInterval; + }); + + afterEach(() => { + for (const handle of activeTimers) { + originalClearInterval(handle as Parameters[0]); + } + activeTimers.clear(); + globalThis.setInterval = originalSetInterval; + globalThis.clearInterval = originalClearInterval; + }); + + it('tears down listener + interval when the request signal aborts (no cancel())', async () => { + const orgId = 'org-abort-' + Math.random().toString(36).slice(2); + const ctrl = new AbortController(); + const ctx = buildContext(ctrl.signal); + + const response = streamInvalidationEvents( + ctx as unknown as Parameters[0], + orgId + ); + // Pull the handshake frame so start() has fully run. + const reader = response.body!.getReader(); + await reader.read(); + + expect(activeTimers.size).toBe(1); + + // Sanity: while subscribed, an emit on this org reaches a probe co-listener. + let probeHits = 0; + const unsubProbe = invalidationEmitter.subscribe(orgId, () => { + probeHits++; + }); + invalidationEmitter.emit(orgId, { keys: ['x'] }); + expect(probeHits).toBe(1); + unsubProbe(); + + // Simulate abnormal disconnect: socket aborted, no cancel(). + ctrl.abort(); + + // Abort handler is synchronous; cleanup should have run. + expect(activeTimers.size).toBe(0); + + // After cleanup, the stream's listener is gone — the emitter map should + // no longer carry an entry for this org (subscribe() with size==0 path + // deletes the key after unsubscribe). + let secondProbeHits = 0; + const unsubProbe2 = invalidationEmitter.subscribe(orgId, () => { + secondProbeHits++; + }); + invalidationEmitter.emit(orgId, { keys: ['y'] }); + expect(secondProbeHits).toBe(1); // only the probe sees it + unsubProbe2(); + }); + + it('cleanup is idempotent across abort + cancel()', async () => { + const orgId = 'org-both-' + Math.random().toString(36).slice(2); + const ctrl = new AbortController(); + const ctx = buildContext(ctrl.signal); + + const response = streamInvalidationEvents( + ctx as unknown as Parameters[0], + orgId + ); + const reader = response.body!.getReader(); + await reader.read(); + + expect(activeTimers.size).toBe(1); + + ctrl.abort(); + await reader.cancel().catch(() => {}); + + // Either path tearing down twice must not throw and must leave 0 timers. + expect(activeTimers.size).toBe(0); + }); + + it('cleans up when client cancels before abort', async () => { + const orgId = 'org-cancel-' + Math.random().toString(36).slice(2); + const ctrl = new AbortController(); + const ctx = buildContext(ctrl.signal); + + const response = streamInvalidationEvents( + ctx as unknown as Parameters[0], + orgId + ); + const reader = response.body!.getReader(); + await reader.read(); + + expect(activeTimers.size).toBe(1); + + await reader.cancel(); + + expect(activeTimers.size).toBe(0); + }); +}); diff --git a/packages/server/src/events/sse.ts b/packages/server/src/events/sse.ts new file mode 100644 index 000000000..36f3aa2f1 --- /dev/null +++ b/packages/server/src/events/sse.ts @@ -0,0 +1,110 @@ +/** + * SSE helper for invalidation event streams. + * + * Both the org-scoped (`/api/:orgSlug/events`) and the public + * (`/api/:orgSlug/public/events`) streams previously bound cleanup only to + * `ReadableStream.cancel()`. Under abnormal disconnects (LB timeout, proxy + * kill, client hard close) `cancel()` does not always fire — the keepalive + * `setInterval` keeps running and the emitter listener stays registered, + * leaking memory + descriptors. Issue lobu-ai/lobu#782. + * + * Fix: bind cleanup to the per-request abort signal (`c.req.raw.signal`), + * which fires on socket close regardless of stream-cancel semantics. Keep + * `cancel()` as a redundant trigger; both routes through an idempotent + * `runCleanup()` so the second one is a no-op. + */ +import type { Context } from 'hono'; +import * as invalidationEmitter from './emitter'; + +interface InvalidationEvent { + keys: string[]; +} + +interface StreamOptions { + /** Optional filter; return `null` to drop the event for this subscriber. */ + filter?: (event: InvalidationEvent) => InvalidationEvent | null; +} + +const KEEPALIVE_INTERVAL_MS = 30000; + +export function streamInvalidationEvents( + c: Context, + organizationId: string, + options: StreamOptions = {} +): Response { + const encoder = new TextEncoder(); + const requestSignal = c.req.raw.signal; + + let cleanedUp = false; + let cleanup: (() => void) | null = null; + let onAbort: (() => void) | null = null; + + const runCleanup = () => { + if (cleanedUp) return; + cleanedUp = true; + if (onAbort && requestSignal) { + requestSignal.removeEventListener('abort', onAbort); + onAbort = null; + } + cleanup?.(); + cleanup = null; + }; + + const stream = new ReadableStream({ + start(controller) { + // If the client already aborted between handler invocation and stream + // start, bail out immediately rather than registering a leaking listener. + if (requestSignal?.aborted) { + try { + controller.close(); + } catch { + // already closed + } + return; + } + + controller.enqueue(encoder.encode('event: connected\ndata: {}\n\n')); + + const unsubscribe = invalidationEmitter.subscribe(organizationId, (event) => { + const forwarded = options.filter ? options.filter(event) : event; + if (!forwarded) return; + try { + const data = JSON.stringify(forwarded); + controller.enqueue(encoder.encode(`event: invalidate\ndata: ${data}\n\n`)); + } catch { + // Controller closed — fall through; abort/cancel will tear down. + } + }); + + const keepAlive = setInterval(() => { + try { + controller.enqueue(encoder.encode(': keepalive\n\n')); + } catch { + // Controller closed — cancel/abort will tear down. + } + }, KEEPALIVE_INTERVAL_MS); + + cleanup = () => { + unsubscribe(); + clearInterval(keepAlive); + try { + controller.close(); + } catch { + // already closed by cancel() + } + }; + + onAbort = () => runCleanup(); + requestSignal?.addEventListener('abort', onAbort, { once: true }); + }, + cancel() { + runCleanup(); + }, + }); + + c.header('Content-Type', 'text/event-stream'); + c.header('Cache-Control', 'no-cache'); + c.header('Connection', 'keep-alive'); + + return c.body(stream); +} diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 60e2e88bb..f7dc52f56 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -24,6 +24,7 @@ import { credentialRoutes } from './auth/routes'; import { connectRoutes } from './connect/routes'; import { getDb } from './db/client'; import * as invalidationEmitter from './events/emitter'; +import { streamInvalidationEvents } from './events/sse'; import { isExcludedSpaPath } from './http/spa-route-filter'; import { restGetAuthProfileForRun, restGetFeedForRun } from './connector-run/routes'; import { agentRoutes } from './lobu/agent-routes'; @@ -1057,45 +1058,7 @@ app.get('/api/:orgSlug/events', mcpAuth, async (c) => { const orgId = c.get('organizationId'); if (!orgId) return c.json({ error: 'Organization context required' }, 401); - const encoder = new TextEncoder(); - let cleanup: (() => void) | null = null; - - const stream = new ReadableStream({ - start(controller) { - controller.enqueue(encoder.encode('event: connected\ndata: {}\n\n')); - - const unsubscribe = invalidationEmitter.subscribe(String(orgId), (event) => { - try { - const data = JSON.stringify(event); - controller.enqueue(encoder.encode(`event: invalidate\ndata: ${data}\n\n`)); - } catch { - // Connection closed - } - }); - - const keepAlive = setInterval(() => { - try { - controller.enqueue(encoder.encode(': keepalive\n\n')); - } catch { - clearInterval(keepAlive); - } - }, 30000); - - cleanup = () => { - unsubscribe(); - clearInterval(keepAlive); - }; - }, - cancel() { - cleanup?.(); - }, - }); - - c.header('Content-Type', 'text/event-stream'); - c.header('Cache-Control', 'no-cache'); - c.header('Connection', 'keep-alive'); - - return c.body(stream); + return streamInvalidationEvents(c, String(orgId)); }); /** diff --git a/packages/server/src/rest-api.ts b/packages/server/src/rest-api.ts index f44e2d3e0..23e54689d 100644 --- a/packages/server/src/rest-api.ts +++ b/packages/server/src/rest-api.ts @@ -8,7 +8,7 @@ import * as Sentry from '@sentry/node'; import type { Context } from 'hono'; import { getDb } from './db/client'; -import * as invalidationEmitter from './events/emitter'; +import { streamInvalidationEvents } from './events/sse'; import type { Env } from './index'; import { EMPTY_SUMMARY, @@ -613,47 +613,13 @@ export async function publicRestEventsStream(c: Context<{ Bindings: Env }>) { const organizationId = await resolvePublicOrganizationId(orgSlug); if (!organizationId) return c.json({ error: 'Not found' }, 404); - const encoder = new TextEncoder(); - let cleanup: (() => void) | null = null; - - const stream = new ReadableStream({ - start(controller) { - controller.enqueue(encoder.encode('event: connected\ndata: {}\n\n')); - - const unsubscribe = invalidationEmitter.subscribe(organizationId, (event) => { - const publicKeys = event.keys.filter((k) => PUBLIC_INVALIDATION_KEYS.has(k)); - if (publicKeys.length === 0) return; - try { - const data = JSON.stringify({ ...event, keys: publicKeys }); - controller.enqueue(encoder.encode(`event: invalidate\ndata: ${data}\n\n`)); - } catch { - // Connection closed - } - }); - - const keepAlive = setInterval(() => { - try { - controller.enqueue(encoder.encode(': keepalive\n\n')); - } catch { - clearInterval(keepAlive); - } - }, 30000); - - cleanup = () => { - unsubscribe(); - clearInterval(keepAlive); - }; - }, - cancel() { - cleanup?.(); + return streamInvalidationEvents(c, organizationId, { + filter: (event) => { + const publicKeys = event.keys.filter((k) => PUBLIC_INVALIDATION_KEYS.has(k)); + if (publicKeys.length === 0) return null; + return { ...event, keys: publicKeys }; }, }); - - c.header('Content-Type', 'text/event-stream'); - c.header('Cache-Control', 'no-cache'); - c.header('Connection', 'keep-alive'); - - return c.body(stream); } /** From 70c4207b4eae9793c81ae40011c1a3dd4f2ff4b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Mon, 18 May 2026 00:57:41 +0100 Subject: [PATCH 2/2] test(sse): strengthen unsubscribe + pre-aborted-signal coverage (pi review) --- .../src/__tests__/unit/sse-cleanup.test.ts | 75 ++++++++++++++++++- 1 file changed, 74 insertions(+), 1 deletion(-) diff --git a/packages/server/src/__tests__/unit/sse-cleanup.test.ts b/packages/server/src/__tests__/unit/sse-cleanup.test.ts index 1f9a23a61..413156fa7 100644 --- a/packages/server/src/__tests__/unit/sse-cleanup.test.ts +++ b/packages/server/src/__tests__/unit/sse-cleanup.test.ts @@ -9,7 +9,7 @@ * 3. Aborting without ever consuming or cancelling the stream. * 4. Asserting the listener stops receiving events and the interval is cleared. */ -import { afterEach, beforeEach, describe, expect, it } from 'bun:test'; +import { afterEach, beforeEach, describe, expect, it, spyOn } from 'bun:test'; import * as invalidationEmitter from '../../events/emitter'; import { streamInvalidationEvents } from '../../events/sse'; @@ -129,4 +129,77 @@ describe('streamInvalidationEvents cleanup', () => { expect(activeTimers.size).toBe(0); }); + + it('calls the emitter unsubscribe exactly once on abort', async () => { + // Pi-review nit #1: prove the stream's invalidation listener is + // unsubscribed, not just that an unrelated probe still receives events. + // Spy on subscribe so we can attribute the returned unsubscribe to the + // stream and count its invocations. + const realSubscribe = invalidationEmitter.subscribe; + let unsubscribeCalls = 0; + const subscribeSpy = spyOn(invalidationEmitter, 'subscribe').mockImplementation( + (organizationId, listener) => { + const inner = realSubscribe(organizationId, listener); + return () => { + unsubscribeCalls++; + inner(); + }; + } + ); + + try { + const orgId = 'org-unsub-' + Math.random().toString(36).slice(2); + const ctrl = new AbortController(); + const ctx = buildContext(ctrl.signal); + + const response = streamInvalidationEvents( + ctx as unknown as Parameters[0], + orgId + ); + const reader = response.body!.getReader(); + await reader.read(); + + expect(subscribeSpy).toHaveBeenCalledTimes(1); + expect(unsubscribeCalls).toBe(0); + + ctrl.abort(); + + expect(unsubscribeCalls).toBe(1); + + // Idempotent: a follow-up cancel must NOT re-call unsubscribe. + await reader.cancel().catch(() => {}); + expect(unsubscribeCalls).toBe(1); + } finally { + subscribeSpy.mockRestore(); + } + }); + + it('does not register a listener or interval if the request is already aborted', async () => { + // Pi-review nit #2: cover the early-aborted-signal branch in + // streamInvalidationEvents.start(). With a pre-aborted signal, start() + // must close the controller without ever calling subscribe() or + // setInterval(). + const subscribeSpy = spyOn(invalidationEmitter, 'subscribe'); + + try { + const orgId = 'org-preaborted-' + Math.random().toString(36).slice(2); + const ctrl = new AbortController(); + ctrl.abort(); + const ctx = buildContext(ctrl.signal); + + const response = streamInvalidationEvents( + ctx as unknown as Parameters[0], + orgId + ); + const reader = response.body!.getReader(); + // The stream should be closed immediately; first read resolves done. + const result = await reader.read(); + expect(result.done).toBe(true); + + expect(subscribeSpy).not.toHaveBeenCalled(); + expect(activeTimers.size).toBe(0); + } finally { + subscribeSpy.mockRestore(); + } + }); });