fix(server): plug additional listener leaks identified during #782 hunt#845
Conversation
The invalidation-event streams were fixed in #833: bind cleanup to the per-request AbortSignal so abnormal disconnects (LB idle timeout, proxy kill, client hard close) actually tear the stream down. Two more SSE routes share the same retain shape and were missed: - packages/server/src/gateway/routes/public/agent.ts — the agent SSE channel at /api/v1/agents/:agentId/events - packages/server/src/gateway/gateway/index.ts — the worker SSE channel at /worker/stream Both use Hono's streamSSE/stream helpers. Looking at node_modules/hono/dist/helper/streaming/sse.js, the only place Hono wires signal.abort to stream.abort is inside an isOldBunVersion() branch — on Node + current Bun, only ReadableStream.cancel() triggers the onAbort subscribers. Under abnormal disconnect the cancel path never runs, so: - the 30s heartbeat setInterval keeps firing forever - SseManager keeps the dead connection in its per-agent Set - the `while !stream.aborted` loop runs forever - WorkerConnectionManager keeps the dead writer until the 10-minute stale-cleanup sweeps it (and the loop closure leaks until then) All three retain the closure over the dead stream — the same leak shape #833 fixed for invalidation streams. Credible contributor to the OOMKilled-at-1Gi event tracked in #782. Fix --- New helper `events/sse-abort-bridge.ts` centralizes the bridge so the pattern doesn't drift again. Both routes now: 1. Bail early if requestSignal.aborted is already true at handler entry. 2. Wire stream.onAbort to local cleanup (interval clear + map evict). 3. Bridge requestSignal -> stream.abort() via the helper, so onAbort subscribers run on abnormal disconnects too. 4. detach the signal listener in a finally so the loop exit path doesn't leak it. Reproducer ---------- Wrote a Hono-streamSSE integration test in src/__tests__/unit/sse-abort-bridge.test.ts that reproduces the leak: - Without the bridge (control run): after the per-request abort fires, the heartbeat setInterval is still active and the while-loop never exits within 500ms. - With the bridge: timers=0 and loopExited=true within ~25ms. The control run was kept locally and verified ("OLD reproduce: loopExited = false timers = 1") before being deleted. Test plan --------- - bun test src/__tests__/unit/sse-abort-bridge.test.ts -> 8/8 pass - bun test src/__tests__/unit/sse-cleanup.test.ts -> 5/5 pass (the existing #833 suite, unchanged) - bun run typecheck -> clean - make build-packages -> clean Refs #782.
|
Warning Rate limit exceeded
You’ve run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (4)
✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
|
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
…nto MCP heartbeat (#864) * fix(server): close SSE bridge registration-order races + wire abort into MCP heartbeat Follow-up to #845 — codex audit caught three race windows the original PR missed. 1. `gateway/gateway/index.ts`: `WorkerGateway.handleStreamConnection` registered the `sseWriter.onClose` cleanup AFTER awaiting `pauseWorker` / `addConnection` / `registerWorker`. An abort fired in that window left a dead writer registered in `WorkerConnectionManager`. Fix: idempotent cleanup latch wired BEFORE the async setup; the abort bridge routes through it, and post-await checkpoints short-circuit when the latch tripped. 2. `gateway/routes/public/agent.ts`: the agent events SSE route called `sseManager.addConnection(...)` + initial `writeSSE` / backlog writes BEFORE wiring `stream.onAbort(cleanup)` and the abort bridge. An abort in that window leaked the manager registration. Fix: same idempotent latch — cleanup + abort bridge registered FIRST, manager.add() second, async writes inside the latch's try-finally. 3. `mcp-handler.ts`: `withSSEHeartbeat` wrapped SSE responses with a heartbeat `setInterval` but never bound the inbound request's `AbortSignal`. Abnormal disconnects (LB timeout, proxy kill, client hard-close) left the interval running forever — the same root cause as #833/#845. Fix: thread `req.signal` through `withSSEHeartbeat` and bind it to the writable via `bindRequestAbortToStream`. Test plan: existing `sse-abort-bridge.test.ts` + new `sse-bridge-registration-order.test.ts` covering each race window and a direct check that the MCP heartbeat interval clears on abrupt abort. * fix(mcp-handler): create heartbeat interval before binding abort signal When the inbound request signal is already aborted at withSSEHeartbeat entry, bindRequestAbortToStream() synchronously calls adapter.abort() -> abortWriter() and latches 'terminated=true'. If setInterval() runs AFTER the bind in that case, intervalId is undefined inside abortWriter() so clearInterval() never fires and the heartbeat timer leaks. Swap the order: setInterval first (intervalId defined), then bind. The pre-aborted path now clears the interval immediately. Regression test added covers the pre-aborted bind window.
Summary
Refs #782.
PR #833 wired the per-request
AbortSignalto cleanup for the twoinvalidation-event SSE streams — but two more SSE routes share the
same retain shape and were missed:
packages/server/src/gateway/routes/public/agent.ts— the agent SSE channel atGET /api/v1/agents/:agentId/eventspackages/server/src/gateway/gateway/index.ts— the worker SSE channel atGET /worker/streamBoth use Hono's
streamSSE/streamhelpers. Readingnode_modules/hono/dist/helper/streaming/sse.js, Hono only wiressignal -> stream.abort()inside anisOldBunVersion()branch. On Node + current Bun the only thing that triggersonAbortsubscribers isReadableStream.cancel()— which, under LB idle timeout / proxy kill / client hard close, does not fire.Concretely, under abnormal disconnect on each of these routes:
setIntervalkeeps firing forever,SseManager(agent stream) keeps the dead connection in its per-agentSet,WorkerConnectionManager(worker stream) keeps the dead writer until the 10-minute stale-cleanup sweeps it,while (!stream.aborted) await stream.sleep(1000)loop never exits.All three retain the closure over the dead stream — the same leak shape #833 fixed for invalidation streams. Credible contributor to the OOMKilled-at-1Gi event tracked in #782.
Fix
New helper
packages/server/src/events/sse-abort-bridge.tscentralizes the abort bridge so the pattern doesn't drift again the next time we add an SSE route. Both Hono-streaming routes now:requestSignal.abortedis already true at handler entry.stream.onAbortto local cleanup (interval clear + map evict).requestSignal -> stream.abort()via the helper, so theonAbortsubscribers run on abnormal disconnects too.detach()the signal listener in afinallyso the natural exit path doesn't leak it.Reproducer
Added
src/__tests__/unit/sse-abort-bridge.test.tswith a HonostreamSSE-mounted integration test:AbortController.abort()fires, the heartbeatsetIntervalis still active and thewhile-!abortedloop has not exited within a 500ms grace window. Log line from the control run:OLD reproduce: loopExited = false timers = 1.activeTimers.sizereturns to 0.The control test was kept locally to validate the failure mode and then deleted from the repo to keep the committed suite tight; the green assertion is what's tested in CI.
Test plan
bun test src/__tests__/unit/sse-abort-bridge.test.ts-> 8/8 passbun test src/__tests__/unit/sse-cleanup.test.ts-> 5/5 pass (the existing fix(server): tear down SSE keepalive + listener on abnormal disconnect #833 suite, unchanged)bun run typecheck-> cleanmake build-packages-> cleansummaries-app-lobu-appRSS slope over 24h post-deploy and confirm it flattens, or grab a heap-snapshot diff via the flags plumbed in lobu-ai/owletto-web#140.Scope
SSE-listener teardown only. The broader heap-leak hunt in #782 stays open until prod confirms. Other prime suspects from #782 not touched here:
recordLifecycleEvent/metric_seriestrain — bounded by inspection (single INSERT, 5s statement_timeout, 2000-row cap; no accumulating state on the call path).chat-state-adapter— no such file in the codebase; the path lives in the chat-SDK package and uses Postgres-backed state (chat_state_lists).