fix(server): close SSE bridge registration-order races + wire abort into MCP heartbeat#864
Conversation
…nto MCP heartbeat Follow-up to #845 — codex audit caught three race windows the original PR missed. 1. `gateway/gateway/index.ts`: `WorkerGateway.handleStreamConnection` registered the `sseWriter.onClose` cleanup AFTER awaiting `pauseWorker` / `addConnection` / `registerWorker`. An abort fired in that window left a dead writer registered in `WorkerConnectionManager`. Fix: idempotent cleanup latch wired BEFORE the async setup; the abort bridge routes through it, and post-await checkpoints short-circuit when the latch tripped. 2. `gateway/routes/public/agent.ts`: the agent events SSE route called `sseManager.addConnection(...)` + initial `writeSSE` / backlog writes BEFORE wiring `stream.onAbort(cleanup)` and the abort bridge. An abort in that window leaked the manager registration. Fix: same idempotent latch — cleanup + abort bridge registered FIRST, manager.add() second, async writes inside the latch's try-finally. 3. `mcp-handler.ts`: `withSSEHeartbeat` wrapped SSE responses with a heartbeat `setInterval` but never bound the inbound request's `AbortSignal`. Abnormal disconnects (LB timeout, proxy kill, client hard-close) left the interval running forever — the same root cause as #833/#845. Fix: thread `req.signal` through `withSSEHeartbeat` and bind it to the writable via `bindRequestAbortToStream`. Test plan: existing `sse-abort-bridge.test.ts` + new `sse-bridge-registration-order.test.ts` covering each race window and a direct check that the MCP heartbeat interval clears on abrupt abort.
|
Warning Rate limit exceeded
You’ve run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (4)
✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
|
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
When the inbound request signal is already aborted at withSSEHeartbeat entry, bindRequestAbortToStream() synchronously calls adapter.abort() -> abortWriter() and latches 'terminated=true'. If setInterval() runs AFTER the bind in that case, intervalId is undefined inside abortWriter() so clearInterval() never fires and the heartbeat timer leaks. Swap the order: setInterval first (intervalId defined), then bind. The pre-aborted path now clears the interval immediately. Regression test added covers the pre-aborted bind window.
Follow-up to #845. Codex audit caught three race windows the original PR missed. Shipped as one bundled fix.
Findings addressed
1.
packages/server/src/gateway/gateway/index.ts:239— worker SSEbindRequestAbortToStream(requestSignal, stream)was called BEFORE the asyncpauseWorker/registerWorker/resumeWorkerblock, but thesseWriter.onClosecleanup subscriber that removes the writer fromWorkerConnectionManagerwasn't registered until after that async work (around:280). IfrequestSignalaborted during the async setup window, the stream was aborted but no cleanup subscriber existed yet — a dead writer could still be added at:266and never removed.Fix: register the abort/close cleanup subscriber FIRST (guarding against the dead-writer-add), THEN bind the abort bridge, THEN do the async setup. Cleanup is guarded with an idempotent latch so it's safe to fire before or after the writer is added. Post-await checkpoints short-circuit when the latch has tripped.
2.
packages/server/src/gateway/routes/public/agent.ts:905— agent events SSESame shape —
sseManager.addConnection(...)+ initialwriteSSE/ backlog writes happened before thestream.onAbort(cleanup)+ bridge registration at:933+. If abort or a write failure happened in that window, the manager registration was leaked.Fix: register cleanup + bind the abort bridge before adding to
SseManager. Same idempotent latch pattern;connectionAddedflag gates whetherremoveConnectionruns.3.
packages/server/src/mcp-handler.ts:474—withSSEHeartbeatWrapped SSE responses with a heartbeat
setIntervaland returnednew Response(readable)without binding the request abort signal. The pre-existing close/error cleanup catches normal pipe-through closure but not abnormal disconnects (LB timeout, proxy kill, client hard-close) — the same root cause as #833 / #845.Fix: thread the inbound request's
AbortSignalintowithSSEHeartbeatand bind it to the writable via the samebindRequestAbortToStreamhelper. The caller (handleAndMaybeConvert) passesreq.signalthrough.Validation
make typecheck— pre-existing unrelated errors onorganizationId/WorkerTokenDataexist onmain; no new errors from this PR.make build-packages— clean.bun test src/__tests__/unit/sse-abort-bridge.test.ts— 8/8 pass (existing).bun test src/__tests__/unit/sse-bridge-registration-order.test.ts— 7/7 pass (new regression coverage for each route's registration-order fix + a direct check thatwithSSEHeartbeatclears its interval on abrupt abort).bun test src/__tests__/unit/— 184 pass, 0 fail, 16 skip.Notes
check-driftfailure (owletto submodule behind) is unrelated and not blocking.@deprecated.