-
Notifications
You must be signed in to change notification settings - Fork 20
fix(server): tear down SSE keepalive + listener on abnormal disconnect #833
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,205 @@ | ||
| /** | ||
| * Regression test for lobu-ai/lobu#782: SSE keepalive timer + emitter listener | ||
| * must be torn down when the underlying request aborts, not only when the | ||
| * `ReadableStream.cancel()` callback fires. | ||
| * | ||
| * Reproduces the abnormal-disconnect path by: | ||
| * 1. Wiring an AbortController as `c.req.raw.signal`. | ||
| * 2. Letting the stream start (registers the listener + interval). | ||
| * 3. Aborting without ever consuming or cancelling the stream. | ||
| * 4. Asserting the listener stops receiving events and the interval is cleared. | ||
| */ | ||
| import { afterEach, beforeEach, describe, expect, it, spyOn } from 'bun:test'; | ||
| import * as invalidationEmitter from '../../events/emitter'; | ||
| import { streamInvalidationEvents } from '../../events/sse'; | ||
|
|
||
| function buildContext(signal: AbortSignal) { | ||
| return { | ||
| req: { raw: { signal } }, | ||
| header: () => {}, | ||
| body: (stream: ReadableStream) => new Response(stream), | ||
| }; | ||
| } | ||
|
|
||
| describe('streamInvalidationEvents cleanup', () => { | ||
| const originalSetInterval = globalThis.setInterval; | ||
| const originalClearInterval = globalThis.clearInterval; | ||
| const activeTimers = new Set<unknown>(); | ||
|
|
||
| beforeEach(() => { | ||
| activeTimers.clear(); | ||
| globalThis.setInterval = ((fn: () => void, ms: number) => { | ||
| const handle = originalSetInterval(fn, ms); | ||
| activeTimers.add(handle); | ||
| return handle; | ||
| }) as typeof setInterval; | ||
| globalThis.clearInterval = ((handle: unknown) => { | ||
| activeTimers.delete(handle); | ||
| return originalClearInterval(handle as Parameters<typeof clearInterval>[0]); | ||
| }) as typeof clearInterval; | ||
| }); | ||
|
|
||
| afterEach(() => { | ||
| for (const handle of activeTimers) { | ||
| originalClearInterval(handle as Parameters<typeof clearInterval>[0]); | ||
| } | ||
| activeTimers.clear(); | ||
| globalThis.setInterval = originalSetInterval; | ||
| globalThis.clearInterval = originalClearInterval; | ||
| }); | ||
|
|
||
| it('tears down listener + interval when the request signal aborts (no cancel())', async () => { | ||
| const orgId = 'org-abort-' + Math.random().toString(36).slice(2); | ||
| const ctrl = new AbortController(); | ||
| const ctx = buildContext(ctrl.signal); | ||
|
|
||
| const response = streamInvalidationEvents( | ||
| ctx as unknown as Parameters<typeof streamInvalidationEvents>[0], | ||
| orgId | ||
| ); | ||
| // Pull the handshake frame so start() has fully run. | ||
| const reader = response.body!.getReader(); | ||
| await reader.read(); | ||
|
|
||
| expect(activeTimers.size).toBe(1); | ||
|
|
||
| // Sanity: while subscribed, an emit on this org reaches a probe co-listener. | ||
| let probeHits = 0; | ||
| const unsubProbe = invalidationEmitter.subscribe(orgId, () => { | ||
| probeHits++; | ||
| }); | ||
| invalidationEmitter.emit(orgId, { keys: ['x'] }); | ||
| expect(probeHits).toBe(1); | ||
| unsubProbe(); | ||
|
|
||
| // Simulate abnormal disconnect: socket aborted, no cancel(). | ||
| ctrl.abort(); | ||
|
|
||
| // Abort handler is synchronous; cleanup should have run. | ||
| expect(activeTimers.size).toBe(0); | ||
|
|
||
| // After cleanup, the stream's listener is gone — the emitter map should | ||
| // no longer carry an entry for this org (subscribe() with size==0 path | ||
| // deletes the key after unsubscribe). | ||
| let secondProbeHits = 0; | ||
| const unsubProbe2 = invalidationEmitter.subscribe(orgId, () => { | ||
| secondProbeHits++; | ||
| }); | ||
| invalidationEmitter.emit(orgId, { keys: ['y'] }); | ||
| expect(secondProbeHits).toBe(1); // only the probe sees it | ||
| unsubProbe2(); | ||
| }); | ||
|
|
||
| it('cleanup is idempotent across abort + cancel()', async () => { | ||
| const orgId = 'org-both-' + Math.random().toString(36).slice(2); | ||
| const ctrl = new AbortController(); | ||
| const ctx = buildContext(ctrl.signal); | ||
|
|
||
| const response = streamInvalidationEvents( | ||
| ctx as unknown as Parameters<typeof streamInvalidationEvents>[0], | ||
| orgId | ||
| ); | ||
| const reader = response.body!.getReader(); | ||
| await reader.read(); | ||
|
|
||
| expect(activeTimers.size).toBe(1); | ||
|
|
||
| ctrl.abort(); | ||
| await reader.cancel().catch(() => {}); | ||
|
|
||
| // Either path tearing down twice must not throw and must leave 0 timers. | ||
| expect(activeTimers.size).toBe(0); | ||
| }); | ||
|
|
||
| it('cleans up when client cancels before abort', async () => { | ||
| const orgId = 'org-cancel-' + Math.random().toString(36).slice(2); | ||
| const ctrl = new AbortController(); | ||
| const ctx = buildContext(ctrl.signal); | ||
|
|
||
| const response = streamInvalidationEvents( | ||
| ctx as unknown as Parameters<typeof streamInvalidationEvents>[0], | ||
| orgId | ||
| ); | ||
| const reader = response.body!.getReader(); | ||
| await reader.read(); | ||
|
|
||
| expect(activeTimers.size).toBe(1); | ||
|
|
||
| await reader.cancel(); | ||
|
|
||
| expect(activeTimers.size).toBe(0); | ||
| }); | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
|
|
||
| it('calls the emitter unsubscribe exactly once on abort', async () => { | ||
| // Pi-review nit #1: prove the stream's invalidation listener is | ||
| // unsubscribed, not just that an unrelated probe still receives events. | ||
| // Spy on subscribe so we can attribute the returned unsubscribe to the | ||
| // stream and count its invocations. | ||
| const realSubscribe = invalidationEmitter.subscribe; | ||
| let unsubscribeCalls = 0; | ||
| const subscribeSpy = spyOn(invalidationEmitter, 'subscribe').mockImplementation( | ||
| (organizationId, listener) => { | ||
| const inner = realSubscribe(organizationId, listener); | ||
| return () => { | ||
| unsubscribeCalls++; | ||
| inner(); | ||
| }; | ||
| } | ||
| ); | ||
|
|
||
| try { | ||
| const orgId = 'org-unsub-' + Math.random().toString(36).slice(2); | ||
| const ctrl = new AbortController(); | ||
| const ctx = buildContext(ctrl.signal); | ||
|
|
||
| const response = streamInvalidationEvents( | ||
| ctx as unknown as Parameters<typeof streamInvalidationEvents>[0], | ||
| orgId | ||
| ); | ||
| const reader = response.body!.getReader(); | ||
| await reader.read(); | ||
|
|
||
| expect(subscribeSpy).toHaveBeenCalledTimes(1); | ||
| expect(unsubscribeCalls).toBe(0); | ||
|
|
||
| ctrl.abort(); | ||
|
|
||
| expect(unsubscribeCalls).toBe(1); | ||
|
|
||
| // Idempotent: a follow-up cancel must NOT re-call unsubscribe. | ||
| await reader.cancel().catch(() => {}); | ||
| expect(unsubscribeCalls).toBe(1); | ||
| } finally { | ||
| subscribeSpy.mockRestore(); | ||
| } | ||
| }); | ||
|
|
||
| it('does not register a listener or interval if the request is already aborted', async () => { | ||
| // Pi-review nit #2: cover the early-aborted-signal branch in | ||
| // streamInvalidationEvents.start(). With a pre-aborted signal, start() | ||
| // must close the controller without ever calling subscribe() or | ||
| // setInterval(). | ||
| const subscribeSpy = spyOn(invalidationEmitter, 'subscribe'); | ||
|
|
||
| try { | ||
| const orgId = 'org-preaborted-' + Math.random().toString(36).slice(2); | ||
| const ctrl = new AbortController(); | ||
| ctrl.abort(); | ||
| const ctx = buildContext(ctrl.signal); | ||
|
|
||
| const response = streamInvalidationEvents( | ||
| ctx as unknown as Parameters<typeof streamInvalidationEvents>[0], | ||
| orgId | ||
| ); | ||
| const reader = response.body!.getReader(); | ||
| // The stream should be closed immediately; first read resolves done. | ||
| const result = await reader.read(); | ||
| expect(result.done).toBe(true); | ||
|
|
||
| expect(subscribeSpy).not.toHaveBeenCalled(); | ||
| expect(activeTimers.size).toBe(0); | ||
| } finally { | ||
| subscribeSpy.mockRestore(); | ||
| } | ||
| }); | ||
| }); | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,110 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * SSE helper for invalidation event streams. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Both the org-scoped (`/api/:orgSlug/events`) and the public | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * (`/api/:orgSlug/public/events`) streams previously bound cleanup only to | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * `ReadableStream.cancel()`. Under abnormal disconnects (LB timeout, proxy | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * kill, client hard close) `cancel()` does not always fire — the keepalive | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * `setInterval` keeps running and the emitter listener stays registered, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * leaking memory + descriptors. Issue lobu-ai/lobu#782. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Fix: bind cleanup to the per-request abort signal (`c.req.raw.signal`), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * which fires on socket close regardless of stream-cancel semantics. Keep | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * `cancel()` as a redundant trigger; both routes through an idempotent | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * `runCleanup()` so the second one is a no-op. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import type { Context } from 'hono'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import * as invalidationEmitter from './emitter'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| interface InvalidationEvent { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| keys: string[]; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| interface StreamOptions { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** Optional filter; return `null` to drop the event for this subscriber. */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| filter?: (event: InvalidationEvent) => InvalidationEvent | null; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const KEEPALIVE_INTERVAL_MS = 30000; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| export function streamInvalidationEvents( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| c: Context, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| organizationId: string, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| options: StreamOptions = {} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ): Response { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const encoder = new TextEncoder(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const requestSignal = c.req.raw.signal; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let cleanedUp = false; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let cleanup: (() => void) | null = null; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let onAbort: (() => void) | null = null; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const runCleanup = () => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (cleanedUp) return; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cleanedUp = true; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (onAbort && requestSignal) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| requestSignal.removeEventListener('abort', onAbort); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| onAbort = null; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cleanup?.(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cleanup = null; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const stream = new ReadableStream({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| start(controller) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // If the client already aborted between handler invocation and stream | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // start, bail out immediately rather than registering a leaking listener. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (requestSignal?.aborted) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| controller.close(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // already closed | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| controller.enqueue(encoder.encode('event: connected\ndata: {}\n\n')); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const unsubscribe = invalidationEmitter.subscribe(organizationId, (event) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const forwarded = options.filter ? options.filter(event) : event; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!forwarded) return; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const data = JSON.stringify(forwarded); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| controller.enqueue(encoder.encode(`event: invalidate\ndata: ${data}\n\n`)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Controller closed — fall through; abort/cancel will tear down. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const keepAlive = setInterval(() => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| controller.enqueue(encoder.encode(': keepalive\n\n')); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Controller closed — cancel/abort will tear down. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }, KEEPALIVE_INTERVAL_MS); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cleanup = () => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| unsubscribe(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| clearInterval(keepAlive); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| controller.close(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // already closed by cancel() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| onAbort = () => runCleanup(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| requestSignal?.addEventListener('abort', onAbort, { once: true }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+57
to
+99
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🌐 Web query:
💡 Result: In JavaScript, if an AbortSignal is already aborted (i.e., signal.aborted is true) before you call addEventListener('abort',...), the event listener is not automatically invoked [1]. The 'abort' event is fired only once, at the moment the abort occurs [2][1]. If you attach a listener after that moment, the event has already passed, and your listener will not be triggered [1]. To handle this scenario correctly, you should always check the state of the signal immediately after adding the listener or before performing an operation [1]. A common pattern is to check signal.aborted or call signal.throwIfAborted to handle the already-aborted state synchronously [3][1]. For example: if (signal.aborted) { // Handle the already-aborted state immediately handleAbort(signal.reason); } else { // Attach the listener for future aborts signal.addEventListener('abort', => handleAbort(signal.reason)); } Alternatively, many APIs that accept an AbortSignal (like fetch) handle this check internally, automatically rejecting the associated promise if the signal is already aborted [1]. Citations:
Close the abort-listener race window to avoid resource leaks. Between line 57 and line 98, an abort can occur after the initial aborted check but before the listener is registered. Since AbortSignal fires the abort event only once, a listener registered after the abort has already fired will never be invoked, leaving cleanup skipped and resources (controller, interval, subscription) dangling. Suggested patch onAbort = () => runCleanup();
requestSignal?.addEventListener('abort', onAbort, { once: true });
+ // Handle aborts that happen between the early `aborted` check
+ // and listener registration.
+ if (requestSignal?.aborted) {
+ runCleanup();
+ return;
+ }📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cancel() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| runCleanup(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| c.header('Content-Type', 'text/event-stream'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| c.header('Cache-Control', 'no-cache'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| c.header('Connection', 'keep-alive'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return c.body(stream); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error suppression may hide unexpected failures.
The test catches and ignores all errors from
cancel(). If the cleanup is truly idempotent, no error should occur. Suppressing errors might hide bugs in the implementation wherecancel()unexpectedly throws after abort.🔍 Suggested fix
Remove the error suppression to let unexpected errors fail the test:
Alternatively, if cancel() is expected to throw in this scenario, verify the specific error:
📝 Committable suggestion
🤖 Prompt for AI Agents