fix(server): tear down SSE keepalive + listener on abnormal disconnect#833
Conversation
Both invalidation event streams (`/api/:orgSlug/events` and `/api/:orgSlug/public/events`) bound cleanup only to `ReadableStream.cancel()`. Under abnormal disconnects — LB timeout, proxy kill, client hard close — `cancel()` does not fire, leaving the 30s keepalive `setInterval` running and the emitter listener registered. Each stale connection retains a closure over the controller, slowly leaking memory and file descriptors. Wire cleanup to `c.req.raw.signal` (Hono's per-request `AbortSignal`), which fires on socket close regardless of stream-cancel semantics. Keep `cancel()` as a redundant trigger; both route through an idempotent `runCleanup()` so the second call is a no-op. Extract a small `streamInvalidationEvents` helper since the pattern was duplicated in two places. Refs #782.
|
Caution Review failedThe pull request is closed. ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughExtracts SSE streaming into ChangesSSE Stream Refactoring and Integration
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ESLint
ESLint skipped: no ESLint configuration detected in root package.json. To enable, add Comment |
|
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
packages/server/src/__tests__/unit/sse-cleanup.test.ts (1)
81-90: ⚡ Quick winConsider strengthening the listener-removal verification.
The current approach verifies that a new probe listener receives events after abort, but doesn't directly prove the stream's listener was removed. A stronger test would verify that the stream's reader itself stops receiving events after cleanup.
💡 Suggested enhancement
After aborting, attempt to read from the original reader and verify that subsequent emissions don't reach it:
ctrl.abort(); // Abort handler is synchronous; cleanup should have run. expect(activeTimers.size).toBe(0); + // Verify the stream's reader is closed or doesn't receive new events + invalidationEmitter.emit(orgId, { keys: ['post-abort'] }); + const postAbortRead = await reader.read(); + // Should be done (stream closed) or shouldn't contain the post-abort event + expect(postAbortRead.done || !postAbortRead.value?.includes('post-abort')).toBe(true); // After cleanup, the stream's listener is gone — the emitter map should🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/server/src/__tests__/unit/sse-cleanup.test.ts` around lines 81 - 90, The test currently only checks a new probe listener receives events after cleanup but doesn't prove the original stream listener stopped; update the test to, after calling abort() (or abortController.abort()) and after calling unsubProbe2(), obtain the original stream's reader via stream.getReader() (or reuse the existing reader reference), attempt a reader.read() (or read repeatedly / with a short timeout) after emitting further events with invalidationEmitter.emit(orgId, ...) and assert that the reader does not receive any new chunks (i.e., reads return done=true or no payloads), thereby directly verifying the stream's listener was removed; reference invalidationEmitter.subscribe(), invalidationEmitter.emit(), abort()/abortController, and reader.read() when adding this check.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/server/src/__tests__/unit/sse-cleanup.test.ts`:
- Around line 24-131: Add a fourth unit test in this suite to cover the
"already-aborted signal" path: create an AbortController, call ctrl.abort()
before building ctx with buildContext(ctrl.signal), then call
streamInvalidationEvents(...) and assert that activeTimers.size is 0 (no
interval registered) and that the response body reader immediately returns done
(await reader.read() -> done true); reference the existing patterns in the file
(streamInvalidationEvents, buildContext, activeTimers, reader.read()) so the new
test mirrors the other cases.
- Line 108: The test currently swallows all errors from reader.cancel() which
can hide real failures; either remove the suppression so the test awaits
reader.cancel() directly (letting any unexpected error fail the test) or, if a
specific error is expected in this scenario, replace the suppression with an
explicit assertion such as using expect(reader.cancel()).rejects.toThrow(...) so
the exact error is verified; update the call site where reader.cancel() is
invoked in the test to reflect one of these two approaches.
In `@packages/server/src/events/sse.ts`:
- Around line 57-99: Register the abort listener before or immediately around
setting up resources and then re-check the signal so a race where abort fires
between the initial check and listener registration is closed: move or add
requestSignal?.addEventListener('abort', onAbort, { once: true }) before
starting the subscription/keepalive (or add a synchronous check right after
registering the listener), and if requestSignal?.aborted is true call
runCleanup() and return; ensure onAbort points to runCleanup and cleanup still
unsubscribes invalidationEmitter.subscribe, clears keepAlive, and closes
controller.
---
Nitpick comments:
In `@packages/server/src/__tests__/unit/sse-cleanup.test.ts`:
- Around line 81-90: The test currently only checks a new probe listener
receives events after cleanup but doesn't prove the original stream listener
stopped; update the test to, after calling abort() (or abortController.abort())
and after calling unsubProbe2(), obtain the original stream's reader via
stream.getReader() (or reuse the existing reader reference), attempt a
reader.read() (or read repeatedly / with a short timeout) after emitting further
events with invalidationEmitter.emit(orgId, ...) and assert that the reader does
not receive any new chunks (i.e., reads return done=true or no payloads),
thereby directly verifying the stream's listener was removed; reference
invalidationEmitter.subscribe(), invalidationEmitter.emit(),
abort()/abortController, and reader.read() when adding this check.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: f5e0fcd1-56d9-426f-b7ff-519aba7d8360
📒 Files selected for processing (4)
packages/server/src/__tests__/unit/sse-cleanup.test.tspackages/server/src/events/sse.tspackages/server/src/index.tspackages/server/src/rest-api.ts
| expect(activeTimers.size).toBe(1); | ||
|
|
||
| ctrl.abort(); | ||
| await reader.cancel().catch(() => {}); |
There was a problem hiding this comment.
Error suppression may hide unexpected failures.
The test catches and ignores all errors from cancel(). If the cleanup is truly idempotent, no error should occur. Suppressing errors might hide bugs in the implementation where cancel() unexpectedly throws after abort.
🔍 Suggested fix
Remove the error suppression to let unexpected errors fail the test:
ctrl.abort();
- await reader.cancel().catch(() => {});
+ await reader.cancel();Alternatively, if cancel() is expected to throw in this scenario, verify the specific error:
ctrl.abort();
- await reader.cancel().catch(() => {});
+ await reader.cancel().catch((err) => {
+ expect(err).toBeDefined(); // or check specific error type
+ });📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| await reader.cancel().catch(() => {}); | |
| await reader.cancel(); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/server/src/__tests__/unit/sse-cleanup.test.ts` at line 108, The test
currently swallows all errors from reader.cancel() which can hide real failures;
either remove the suppression so the test awaits reader.cancel() directly
(letting any unexpected error fail the test) or, if a specific error is expected
in this scenario, replace the suppression with an explicit assertion such as
using expect(reader.cancel()).rejects.toThrow(...) so the exact error is
verified; update the call site where reader.cancel() is invoked in the test to
reflect one of these two approaches.
| if (requestSignal?.aborted) { | ||
| try { | ||
| controller.close(); | ||
| } catch { | ||
| // already closed | ||
| } | ||
| return; | ||
| } | ||
|
|
||
| controller.enqueue(encoder.encode('event: connected\ndata: {}\n\n')); | ||
|
|
||
| const unsubscribe = invalidationEmitter.subscribe(organizationId, (event) => { | ||
| const forwarded = options.filter ? options.filter(event) : event; | ||
| if (!forwarded) return; | ||
| try { | ||
| const data = JSON.stringify(forwarded); | ||
| controller.enqueue(encoder.encode(`event: invalidate\ndata: ${data}\n\n`)); | ||
| } catch { | ||
| // Controller closed — fall through; abort/cancel will tear down. | ||
| } | ||
| }); | ||
|
|
||
| const keepAlive = setInterval(() => { | ||
| try { | ||
| controller.enqueue(encoder.encode(': keepalive\n\n')); | ||
| } catch { | ||
| // Controller closed — cancel/abort will tear down. | ||
| } | ||
| }, KEEPALIVE_INTERVAL_MS); | ||
|
|
||
| cleanup = () => { | ||
| unsubscribe(); | ||
| clearInterval(keepAlive); | ||
| try { | ||
| controller.close(); | ||
| } catch { | ||
| // already closed by cancel() | ||
| } | ||
| }; | ||
|
|
||
| onAbort = () => runCleanup(); | ||
| requestSignal?.addEventListener('abort', onAbort, { once: true }); | ||
| }, |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In JavaScript AbortSignal semantics, if signal.aborted === truebeforeaddEventListener('abort', ...) is called, is that listener invoked automatically, or can the abort event be missed?
💡 Result:
In JavaScript, if an AbortSignal is already aborted (i.e., signal.aborted is true) before you call addEventListener('abort',...), the event listener is not automatically invoked [1]. The 'abort' event is fired only once, at the moment the abort occurs [2][1]. If you attach a listener after that moment, the event has already passed, and your listener will not be triggered [1]. To handle this scenario correctly, you should always check the state of the signal immediately after adding the listener or before performing an operation [1]. A common pattern is to check signal.aborted or call signal.throwIfAborted to handle the already-aborted state synchronously [3][1]. For example: if (signal.aborted) { // Handle the already-aborted state immediately handleAbort(signal.reason); } else { // Attach the listener for future aborts signal.addEventListener('abort', => handleAbort(signal.reason)); } Alternatively, many APIs that accept an AbortSignal (like fetch) handle this check internally, automatically rejecting the associated promise if the signal is already aborted [1].
Citations:
- 1: https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal
- 2: https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal/abort_event
- 3: https://stackoverflow.com/questions/78747707/how-to-elegantly-manage-abortsignal-event-listeners-when-implementing-abortable
Close the abort-listener race window to avoid resource leaks.
Between line 57 and line 98, an abort can occur after the initial aborted check but before the listener is registered. Since AbortSignal fires the abort event only once, a listener registered after the abort has already fired will never be invoked, leaving cleanup skipped and resources (controller, interval, subscription) dangling.
Suggested patch
onAbort = () => runCleanup();
requestSignal?.addEventListener('abort', onAbort, { once: true });
+ // Handle aborts that happen between the early `aborted` check
+ // and listener registration.
+ if (requestSignal?.aborted) {
+ runCleanup();
+ return;
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if (requestSignal?.aborted) { | |
| try { | |
| controller.close(); | |
| } catch { | |
| // already closed | |
| } | |
| return; | |
| } | |
| controller.enqueue(encoder.encode('event: connected\ndata: {}\n\n')); | |
| const unsubscribe = invalidationEmitter.subscribe(organizationId, (event) => { | |
| const forwarded = options.filter ? options.filter(event) : event; | |
| if (!forwarded) return; | |
| try { | |
| const data = JSON.stringify(forwarded); | |
| controller.enqueue(encoder.encode(`event: invalidate\ndata: ${data}\n\n`)); | |
| } catch { | |
| // Controller closed — fall through; abort/cancel will tear down. | |
| } | |
| }); | |
| const keepAlive = setInterval(() => { | |
| try { | |
| controller.enqueue(encoder.encode(': keepalive\n\n')); | |
| } catch { | |
| // Controller closed — cancel/abort will tear down. | |
| } | |
| }, KEEPALIVE_INTERVAL_MS); | |
| cleanup = () => { | |
| unsubscribe(); | |
| clearInterval(keepAlive); | |
| try { | |
| controller.close(); | |
| } catch { | |
| // already closed by cancel() | |
| } | |
| }; | |
| onAbort = () => runCleanup(); | |
| requestSignal?.addEventListener('abort', onAbort, { once: true }); | |
| }, | |
| if (requestSignal?.aborted) { | |
| try { | |
| controller.close(); | |
| } catch { | |
| // already closed | |
| } | |
| return; | |
| } | |
| controller.enqueue(encoder.encode('event: connected\ndata: {}\n\n')); | |
| const unsubscribe = invalidationEmitter.subscribe(organizationId, (event) => { | |
| const forwarded = options.filter ? options.filter(event) : event; | |
| if (!forwarded) return; | |
| try { | |
| const data = JSON.stringify(forwarded); | |
| controller.enqueue(encoder.encode(`event: invalidate\ndata: ${data}\n\n`)); | |
| } catch { | |
| // Controller closed — fall through; abort/cancel will tear down. | |
| } | |
| }); | |
| const keepAlive = setInterval(() => { | |
| try { | |
| controller.enqueue(encoder.encode(': keepalive\n\n')); | |
| } catch { | |
| // Controller closed — cancel/abort will tear down. | |
| } | |
| }, KEEPALIVE_INTERVAL_MS); | |
| cleanup = () => { | |
| unsubscribe(); | |
| clearInterval(keepAlive); | |
| try { | |
| controller.close(); | |
| } catch { | |
| // already closed by cancel() | |
| } | |
| }; | |
| onAbort = () => runCleanup(); | |
| requestSignal?.addEventListener('abort', onAbort, { once: true }); | |
| // Handle aborts that happen between the early `aborted` check | |
| // and listener registration. | |
| if (requestSignal?.aborted) { | |
| runCleanup(); | |
| return; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/server/src/events/sse.ts` around lines 57 - 99, Register the abort
listener before or immediately around setting up resources and then re-check the
signal so a race where abort fires between the initial check and listener
registration is closed: move or add requestSignal?.addEventListener('abort',
onAbort, { once: true }) before starting the subscription/keepalive (or add a
synchronous check right after registering the listener), and if
requestSignal?.aborted is true call runCleanup() and return; ensure onAbort
points to runCleanup and cleanup still unsubscribes
invalidationEmitter.subscribe, clears keepAlive, and closes controller.
pi review summaryRan `pi` on this PR. Verdict: approve-with-nits. Findings
Pi's confirmed correctness checks (no findings)
Local test status``` `make typecheck` — clean. |
* docs(agents): cross-repo dispatch pattern + e2e-before-merge gate - Owletto agents work in standalone ~/Code/owletto clone, not packages/owletto submodule (avoids inherited origin → wrong remote). - Don't pass "REPO: /absolute/path" in dispatch prompts — agents cd out of their isolation worktree. - Add e2e red→fix→green hard gate before opening bug-fix PRs. Bail if you can't reproduce. Motivated by the 2026-05-17 triage: both #781 and #782 agents hit the origin misconfig, and all three PRs (#833, #835, owletto#160) shipped without a reproducer. * docs(agents): clarify Pi reference per coderabbit review
#855) Both caches were "lazy refresh on read" — they update an entry's expiresAt when the same agentId is looked up again, but never delete entries for agentIds that are never re-queried. Net: cache size grows monotonically with distinct agentIds the gateway has ever seen, bounded only by the pod's lifetime. In practice the growth rate is small (~200 bytes per distinct agentId, hundreds-to-thousands of agents per day) and almost certainly NOT the cause of the 1 Gi OOM that prompted #782 — but it's still a genuine bound-less Map that ideally cleans up after itself. Adds a tiny cacheSet helper with a 1024-entry cap that evicts the oldest insertion (Maps iterate in insertion order, so size-1 peek-and-delete is O(1)). Test exercises 2048 distinct lookups and asserts both caches stay <= 1024. Refs #782 — hardening, not root-cause fix. SSE keepalive teardown (#833) and in-memory pending-interactions removal (#834) remain the most likely actual OOM fixes.
…845) The invalidation-event streams were fixed in #833: bind cleanup to the per-request AbortSignal so abnormal disconnects (LB idle timeout, proxy kill, client hard close) actually tear the stream down. Two more SSE routes share the same retain shape and were missed: - packages/server/src/gateway/routes/public/agent.ts — the agent SSE channel at /api/v1/agents/:agentId/events - packages/server/src/gateway/gateway/index.ts — the worker SSE channel at /worker/stream Both use Hono's streamSSE/stream helpers. Looking at node_modules/hono/dist/helper/streaming/sse.js, the only place Hono wires signal.abort to stream.abort is inside an isOldBunVersion() branch — on Node + current Bun, only ReadableStream.cancel() triggers the onAbort subscribers. Under abnormal disconnect the cancel path never runs, so: - the 30s heartbeat setInterval keeps firing forever - SseManager keeps the dead connection in its per-agent Set - the `while !stream.aborted` loop runs forever - WorkerConnectionManager keeps the dead writer until the 10-minute stale-cleanup sweeps it (and the loop closure leaks until then) All three retain the closure over the dead stream — the same leak shape #833 fixed for invalidation streams. Credible contributor to the OOMKilled-at-1Gi event tracked in #782. Fix --- New helper `events/sse-abort-bridge.ts` centralizes the bridge so the pattern doesn't drift again. Both routes now: 1. Bail early if requestSignal.aborted is already true at handler entry. 2. Wire stream.onAbort to local cleanup (interval clear + map evict). 3. Bridge requestSignal -> stream.abort() via the helper, so onAbort subscribers run on abnormal disconnects too. 4. detach the signal listener in a finally so the loop exit path doesn't leak it. Reproducer ---------- Wrote a Hono-streamSSE integration test in src/__tests__/unit/sse-abort-bridge.test.ts that reproduces the leak: - Without the bridge (control run): after the per-request abort fires, the heartbeat setInterval is still active and the while-loop never exits within 500ms. - With the bridge: timers=0 and loopExited=true within ~25ms. The control run was kept locally and verified ("OLD reproduce: loopExited = false timers = 1") before being deleted. Test plan --------- - bun test src/__tests__/unit/sse-abort-bridge.test.ts -> 8/8 pass - bun test src/__tests__/unit/sse-cleanup.test.ts -> 5/5 pass (the existing #833 suite, unchanged) - bun run typecheck -> clean - make build-packages -> clean Refs #782.
…nto MCP heartbeat (#864) * fix(server): close SSE bridge registration-order races + wire abort into MCP heartbeat Follow-up to #845 — codex audit caught three race windows the original PR missed. 1. `gateway/gateway/index.ts`: `WorkerGateway.handleStreamConnection` registered the `sseWriter.onClose` cleanup AFTER awaiting `pauseWorker` / `addConnection` / `registerWorker`. An abort fired in that window left a dead writer registered in `WorkerConnectionManager`. Fix: idempotent cleanup latch wired BEFORE the async setup; the abort bridge routes through it, and post-await checkpoints short-circuit when the latch tripped. 2. `gateway/routes/public/agent.ts`: the agent events SSE route called `sseManager.addConnection(...)` + initial `writeSSE` / backlog writes BEFORE wiring `stream.onAbort(cleanup)` and the abort bridge. An abort in that window leaked the manager registration. Fix: same idempotent latch — cleanup + abort bridge registered FIRST, manager.add() second, async writes inside the latch's try-finally. 3. `mcp-handler.ts`: `withSSEHeartbeat` wrapped SSE responses with a heartbeat `setInterval` but never bound the inbound request's `AbortSignal`. Abnormal disconnects (LB timeout, proxy kill, client hard-close) left the interval running forever — the same root cause as #833/#845. Fix: thread `req.signal` through `withSSEHeartbeat` and bind it to the writable via `bindRequestAbortToStream`. Test plan: existing `sse-abort-bridge.test.ts` + new `sse-bridge-registration-order.test.ts` covering each race window and a direct check that the MCP heartbeat interval clears on abrupt abort. * fix(mcp-handler): create heartbeat interval before binding abort signal When the inbound request signal is already aborted at withSSEHeartbeat entry, bindRequestAbortToStream() synchronously calls adapter.abort() -> abortWriter() and latches 'terminated=true'. If setInterval() runs AFTER the bind in that case, intervalId is undefined inside abortWriter() so clearInterval() never fires and the heartbeat timer leaks. Swap the order: setInterval first (intervalId defined), then bind. The pre-aborted path now clears the interval immediately. Regression test added covers the pre-aborted bind window.
Summary
Refs #782.
Both invalidation event SSE streams previously bound cleanup only to
ReadableStream.cancel():packages/server/src/index.ts:1056—/api/:orgSlug/events(admin/member stream)packages/server/src/rest-api.ts:609—/api/:orgSlug/public/events(public stream)Under abnormal disconnects — LB timeout, intermediate proxy kill, client hard close —
cancel()does not fire. That leaves:setIntervalrunning on a dead controller forever, andinvalidationEmitter.subscribe) still registered in the per-orgSetinpackages/server/src/events/emitter.ts.Both retain closures over the dead
ReadableStreamDefaultController— a slow leak that compounds with every flaky/timed-out SSE connection and is a credible contributor to the OOMKilled-at-1Gi event tracked in #782.Fix
Bind cleanup to Hono's per-request
AbortSignal(c.req.raw.signal), which fires on socket close regardless of stream-cancel semantics. Keep thecancel()callback as a redundant trigger so we cover normal client-side aborts too; both routes run through an idempotentrunCleanup()that is safe to call twice.The pattern was duplicated in two places, so it now lives in a small helper:
packages/server/src/events/sse.ts— newstreamInvalidationEvents(c, orgId, { filter? })helper. HandlesAbortSignalwiring, listener registration, keepalive, controller close, and idempotent teardown in one place. The public stream uses the optionalfilterto apply thePUBLIC_INVALIDATION_KEYSallowlist.packages/server/src/index.ts—/api/:orgSlug/eventsreduced to a 4-line handler that delegates to the helper.packages/server/src/rest-api.ts—publicRestEventsStreamreduced similarly.There is also a defensive early-return if
requestSignal.abortedis already true whenstart()runs (a request that aborted between handler invocation and stream pull-start).How abnormal disconnect is handled now
cancel()runs → cleanup ✓cancel()never fires → leakcancel()may not run → leakScope
SSE teardown only — the broader heap-leak hunt in #782 stays open until prod confirms (a 24h+ run without a regression or a fresh heap snapshot diff). The other prime suspect from #782 —
recordLifecycleEvent/metric_seriestrain — is not touched here.No backwards-compat shims. No refactor of the emitter itself. No new dependencies.
Test plan
make typecheck— passes (stricttsc --noEmitacross the workspace).packages/server/src/__tests__/unit/sse-cleanup.test.ts— 3 cases:AbortControllerasc.req.raw.signal, lets the streamstart()run, asserts one active interval and one emitter listener; then callsctrl.abort()and asserts the interval is cleared and the listener is unsubscribed.bun test src/__tests__/unit/sse-cleanup.test.ts→ 3 pass.summaries-app-lobu-appRSS over the next 24h; if the slope flattens (vs. the 1Gi → 2Gi mitigation we have now), confirm the leak source is SSE. Heap-snapshot diff via the flags already plumbed in lobu-ai/owletto-web#140 if it doesn't.Summary by CodeRabbit
Tests
Bug Fixes
Refactor