Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 203 additions & 0 deletions packages/server/src/__tests__/unit/sse-abort-bridge.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/**
* Regression test for lobu-ai/lobu#782: the abort-bridge helper used by the
* Hono-streaming SSE routes (agent SSE channel + worker SSE channel) must
* tear the stream down when the per-request `AbortSignal` fires, even when
* `ReadableStream.cancel()` never runs.
*
* The helper is a thin pure-JS bridge — these tests exercise it directly
* against a stub `AbortableStream`, mirroring the contract documented in
* `events/sse-abort-bridge.ts`.
*/
import { describe, expect, it } from 'bun:test';
import {
bindRequestAbortToStream,
type AbortableStream,
} from '../../events/sse-abort-bridge';

function fakeStream(): AbortableStream & { abortCalls: number } {
return {
aborted: false,
closed: false,
abortCalls: 0,
abort() {
this.abortCalls++;
(this as { aborted: boolean }).aborted = true;
},
};
}

describe('bindRequestAbortToStream', () => {
it('calls stream.abort() when the request signal aborts', () => {
const ctrl = new AbortController();
const stream = fakeStream();

const detach = bindRequestAbortToStream(ctrl.signal, stream);
expect(stream.abortCalls).toBe(0);

ctrl.abort();
expect(stream.abortCalls).toBe(1);

// detach is safe to call after abort has fired.
detach();
});

it('fires synchronously if the signal is already aborted at bind time', () => {
const ctrl = new AbortController();
ctrl.abort();
const stream = fakeStream();

bindRequestAbortToStream(ctrl.signal, stream);

expect(stream.abortCalls).toBe(1);
});

it('detach() prevents the listener from firing later', () => {
const ctrl = new AbortController();
const stream = fakeStream();

const detach = bindRequestAbortToStream(ctrl.signal, stream);
detach();

ctrl.abort();
expect(stream.abortCalls).toBe(0);
});

it('detach() is idempotent', () => {
const ctrl = new AbortController();
const stream = fakeStream();

const detach = bindRequestAbortToStream(ctrl.signal, stream);
detach();
detach(); // must not throw
expect(stream.abortCalls).toBe(0);
});

it('does not call abort() if the stream already closed', () => {
const ctrl = new AbortController();
const stream = fakeStream();
(stream as { closed: boolean }).closed = true;

bindRequestAbortToStream(ctrl.signal, stream);
ctrl.abort();

expect(stream.abortCalls).toBe(0);
});

it('does not call abort() if the stream already aborted', () => {
const ctrl = new AbortController();
const stream = fakeStream();
(stream as { aborted: boolean }).aborted = true;

bindRequestAbortToStream(ctrl.signal, stream);
ctrl.abort();

expect(stream.abortCalls).toBe(0);
});

it('returns a no-op detach when no signal is provided', () => {
const stream = fakeStream();
const detach = bindRequestAbortToStream(undefined, stream);
detach(); // must not throw
expect(stream.abortCalls).toBe(0);
});
});

describe('bindRequestAbortToStream + Hono streamSSE (integration)', () => {
/**
* Reproduces the #782 leak shape end-to-end:
* 1. Mount a Hono route that uses `streamSSE` + a per-iteration heartbeat
* interval + a `while !aborted` loop.
* 2. Wire the abort bridge.
* 3. Hit the route and abort the request without consuming the body or
* cancelling the response stream.
* 4. Assert the heartbeat interval has been cleared and the loop has
* exited within a short bounded wait.
*
* Before the fix, the loop runs forever and the interval keeps firing —
* the test would time out the post-abort assertion.
*/
const originalSetInterval = globalThis.setInterval;
const originalClearInterval = globalThis.clearInterval;

it('tears down heartbeat + loop on request abort', async () => {
const { Hono } = await import('hono');
const { streamSSE } = await import('hono/streaming');

const activeTimers = new Set<unknown>();
globalThis.setInterval = ((fn: () => void, ms: number) => {
const handle = originalSetInterval(fn, ms);
activeTimers.add(handle);
return handle;
}) as typeof setInterval;
globalThis.clearInterval = ((handle: unknown) => {
activeTimers.delete(handle);
return originalClearInterval(
handle as Parameters<typeof clearInterval>[0]
);
}) as typeof clearInterval;

try {
let loopExited = false;
const app = new Hono();
app.get('/sse', (c) =>
streamSSE(c, async (stream) => {
if (c.req.raw.signal?.aborted) return;

const heartbeat = setInterval(() => {
stream.writeSSE({ event: 'ping', data: 'x' }).catch(() => {});
}, 50);

const detach = bindRequestAbortToStream(c.req.raw.signal, stream);
try {
await stream.writeSSE({ event: 'connected', data: '{}' });
while (!stream.aborted && !stream.closed) {
await stream.sleep(25);
}
} finally {
clearInterval(heartbeat);
detach();
loopExited = true;
}
})
);

const ctrl = new AbortController();
const req = new Request('http://localhost/sse', {
method: 'GET',
signal: ctrl.signal,
});

// Fire the request without awaiting the body — we want to abort while
// the handler is mid-loop.
const respPromise = app.fetch(req);
// Give the handler time to register interval + write connected event.
await new Promise((r) => setTimeout(r, 75));

expect(activeTimers.size).toBeGreaterThanOrEqual(1);

ctrl.abort();

// Wait for the loop to exit (max 500ms — sleep granularity is 25ms).
const deadline = Date.now() + 500;
while (!loopExited && Date.now() < deadline) {
await new Promise((r) => setTimeout(r, 10));
}
expect(loopExited).toBe(true);
expect(activeTimers.size).toBe(0);

// Drain the response so we don't leak the body.
try {
const resp = await Promise.resolve(respPromise);
await resp.body?.cancel();
} catch {
/* ignore */
}
} finally {
globalThis.setInterval = originalSetInterval;
globalThis.clearInterval = originalClearInterval;
for (const handle of activeTimers) {
originalClearInterval(handle as Parameters<typeof clearInterval>[0]);
}
}
});
});
64 changes: 64 additions & 0 deletions packages/server/src/events/sse-abort-bridge.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* `bindRequestAbortToStream` — bridge a Hono per-request `AbortSignal` to a
* Hono `StreamingApi` so abnormal disconnects (LB idle timeout, proxy kill,
* client hard close) actually tear the stream down.
*
* Hono's `streamSSE` / `stream` helpers only invoke `streamWriter.onAbort`
* subscribers when the underlying `ReadableStream.cancel()` runs. On
* Node + current Bun, `cancel()` doesn't fire for abnormal disconnects —
* which leaves any heartbeat `setInterval`, registered listener, or
* `while !aborted` loop running forever. Issue lobu-ai/lobu#782.
*
* The invalidation-event streams already use a raw `ReadableStream` with the
* same bridge wired inline (`events/sse.ts`, fixed in PR #833). The two
* remaining Hono-streaming routes — the agent SSE channel at
* `/api/v1/agents/:agentId/events` and the worker SSE channel at
* `/worker/stream` — need the same bridge. This helper centralizes it so
* the pattern doesn't drift again the next time we add an SSE route.
*
* Caller contract:
* const detach = bindRequestAbortToStream(c.req.raw.signal, stream);
* try { ... } finally { detach(); }
*
* `detach()` is idempotent. The function returns immediately (and `detach`
* is a no-op) if the signal is already aborted — the caller is expected to
* check `signal.aborted` and bail before doing real work.
*/

/** Minimal shape we need off Hono's `StreamingApi` / `SSEStreamingApi`. */
export interface AbortableStream {
readonly aborted: boolean;
readonly closed: boolean;
abort(): void;
}

export function bindRequestAbortToStream(
signal: AbortSignal | undefined,
stream: AbortableStream
): () => void {
if (!signal) return () => {};

const onAbort = () => {
if (!stream.aborted && !stream.closed) {
stream.abort();
}
};

// If already aborted, fire synchronously so the caller's loop sees it on
// the next check. addEventListener with `once: true` would also fire if
// re-dispatched, but the platform doesn't re-dispatch already-aborted
// signals — so we have to do it ourselves.
if (signal.aborted) {
onAbort();
return () => {};
}

signal.addEventListener('abort', onAbort, { once: true });

let detached = false;
return () => {
if (detached) return;
detached = true;
signal.removeEventListener('abort', onAbort);
};
}
33 changes: 30 additions & 3 deletions packages/server/src/gateway/gateway/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { createLogger, encrypt, verifyWorkerToken } from "@lobu/core";
import type { Context } from "hono";
import { Hono } from "hono";
import { stream } from "hono/streaming";
import { bindRequestAbortToStream } from "../../events/sse-abort-bridge.js";
import type { ApiKeyProviderModule } from "../auth/api-key-provider-module.js";
import { getRevokedTokenStore } from "../auth/revoked-token-store.js";
import type { McpConfigService } from "../auth/mcp/config-service.js";
Expand Down Expand Up @@ -189,10 +190,27 @@ export class WorkerGateway {
const httpPortParam = c.req.query("httpPort");
const httpPort = httpPortParam ? parseInt(httpPortParam, 10) : undefined;

// Create an SSE stream
// Create an SSE stream.
//
// Hono's `stream()` only fires `streamWriter.onAbort()` from
// `ReadableStream.cancel()` — which doesn't run on abnormal disconnects
// (LB idle timeout, intermediate proxy kill, worker pod hard exit). On
// Node + current Bun the per-request `AbortSignal` is the only reliable
// trigger. Without bridging it, a stale worker SSE leaks the writer
// closure + `while !isClosed` loop until the 10-minute stale-cleanup
// sweep catches up. Same retain pattern fixed for the invalidation
// streams in #833. Refs #782.
const requestSignal = c.req.raw.signal;

return stream(c, async (streamWriter) => {
let isClosed = false;

// If the client already aborted between handler invocation and stream
// body execution, bail out before registering anything.
if (requestSignal?.aborted) {
return;
}

// Create an SSE writer adapter
const sseWriter: SSEWriter = {
write: (data: string): boolean => {
Expand All @@ -218,6 +236,11 @@ export class WorkerGateway {
},
};

const detachAbortBridge = bindRequestAbortToStream(
requestSignal,
streamWriter
);

// Set SSE headers
c.header("Content-Type", "text/event-stream");
c.header("Cache-Control", "no-cache");
Expand Down Expand Up @@ -269,8 +292,12 @@ export class WorkerGateway {
});

// Keep the connection open until the stream is actually aborted.
while (!isClosed) {
await streamWriter.sleep(1000);
try {
while (!isClosed) {
await streamWriter.sleep(1000);
}
} finally {
detachAbortBridge();
}
});
}
Expand Down
39 changes: 34 additions & 5 deletions packages/server/src/gateway/routes/public/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
} from "@lobu/core";
import type { Context } from "hono";
import { streamSSE } from "hono/streaming";
import { bindRequestAbortToStream } from "../../../events/sse-abort-bridge.js";
import { z } from "zod";
import type { AgentMetadataStore } from "../../auth/agent-metadata-store.js";
import { getRevokedTokenStore } from "../../auth/revoked-token-store.js";
Expand Down Expand Up @@ -869,8 +870,25 @@ export function createAgentApi(config: AgentApiConfig): OpenAPIHono {
);
}

// Return SSE stream
// Return SSE stream.
//
// Hono's `streamSSE` only fires `stream.onAbort()` when the underlying
// `ReadableStream.cancel()` runs — which doesn't happen on abnormal
// disconnects (LB idle timeout, intermediate proxy kill, client hard
// close). On Node + current Bun, `signal.abort` is the only reliable
// trigger. Without it the heartbeat interval keeps firing, the
// `sseManager` retains the dead connection, and the `while !aborted`
// loop never exits — the same retain pattern fixed for the invalidation
// streams in #833. Refs #782.
const requestSignal = c.req.raw.signal;

return streamSSE(c, async (stream) => {
// If the client already aborted between handler invocation and stream
// body execution, bail out before registering anything.
if (requestSignal?.aborted) {
return;
}

sseManager.addConnection(sseKey, stream);

await stream.writeSSE({
Expand Down Expand Up @@ -899,14 +917,25 @@ export function createAgentApi(config: AgentApiConfig): OpenAPIHono {
}
}, 30000);

stream.onAbort(() => {
let cleanedUp = false;
const cleanup = () => {
if (cleanedUp) return;
cleanedUp = true;
clearInterval(heartbeatInterval);
sseManager.removeConnection(sseKey, stream);
logger.info(`SSE connection closed for session ${sseKey}`);
});
};

stream.onAbort(cleanup);
const detachAbortBridge = bindRequestAbortToStream(requestSignal, stream);

while (true) {
await stream.sleep(1000);
try {
while (!stream.aborted && !stream.closed) {
await stream.sleep(1000);
}
} finally {
detachAbortBridge();
cleanup();
}
});
});
Expand Down
Loading