diff --git a/assistant/openapi.yaml b/assistant/openapi.yaml index 68f7914dd29..9644463c1a0 100644 --- a/assistant/openapi.yaml +++ b/assistant/openapi.yaml @@ -9060,6 +9060,17 @@ paths: description: Scope to a single conversation by an external key (non-vellum channels) or the web idempotency key. Materializes a row on first use. Ignored when conversationId is also provided. + - name: lastSeenSeq + in: query + required: false + schema: + type: string + description: + "Optional reconnect cursor: the highest per-conversation event seq the client has already applied. When set + together with a conversation scope, the daemon replays any buffered events with seq > lastSeenSeq before + going live. If the cursor is older than the ring buffer's oldest entry the connection simply goes live; the + client is expected to detect the gap from the next event's seq and refetch via the messages API. Must be a + non-negative integer." /v1/events/emit: post: operationId: events_emit_post diff --git a/assistant/src/__tests__/runtime-events-sse-reconnect.test.ts b/assistant/src/__tests__/runtime-events-sse-reconnect.test.ts new file mode 100644 index 00000000000..b086bb2946c --- /dev/null +++ b/assistant/src/__tests__/runtime-events-sse-reconnect.test.ts @@ -0,0 +1,267 @@ +/** + * HTTP-layer tests for the B7.2 reconnect handler on GET /v1/events. + * + * Covers: + * - replay of buffered events when `lastSeenSeq` is in-window + * - cursor older than the ring -> connection goes live without any + * extra wire signal (client is expected to detect the seq jump + * and refetch via the messages API) + * - omitted `lastSeenSeq` falls through to legacy live-only behavior + * - dedup against live events that race in mid-replay + * - malformed `lastSeenSeq` query param rejected with 400 + */ +import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"; + +mock.module("../util/logger.js", () => ({ + getLogger: () => + new Proxy({} as Record, { + get: () => () => {}, + }), +})); + +mock.module("../config/loader.js", () => ({ + getConfig: () => ({ + ui: {}, + model: "test", + provider: "test", + memory: { enabled: false }, + rateLimit: { maxRequestsPerMinute: 0 }, + secretDetection: { enabled: false }, + }), +})); + +import { getOrCreateConversation } from "../memory/conversation-key-store.js"; +import { getDb } from "../memory/db-connection.js"; +import { initializeDb } from "../memory/db-init.js"; +import { buildAssistantEvent } from "../runtime/assistant-event.js"; +import { + _resetConversationStreamsForTesting, + stampAndBuffer, +} from "../runtime/conversation-stream-state.js"; + +initializeDb(); + +const decoder = new TextDecoder(); + +async function readFrame( + reader: ReadableStreamDefaultReader, +): Promise { + const { value, done } = await reader.read(); + expect(done).toBe(false); + return decoder.decode(value); +} + +describe("SSE reconnect replay (B7.2)", () => { + beforeEach(() => { + const db = getDb(); + db.run("DELETE FROM conversation_keys"); + db.run("DELETE FROM conversations"); + _resetConversationStreamsForTesting(); + }); + + afterEach(() => { + _resetConversationStreamsForTesting(); + }); + + test("replays buffered events with seq > lastSeenSeq before the first heartbeat", async () => { + const { conversationId } = getOrCreateConversation("reconnect-replay"); + + // Stamp three events into the ring as if they had already been + // broadcast prior to the client's reconnect. Seqs start at 1, so + // these get seqs 1, 2, 3. + const events = [ + buildAssistantEvent({ type: "pong" }, conversationId), + buildAssistantEvent({ type: "pong" }, conversationId), + buildAssistantEvent({ type: "pong" }, conversationId), + ]; + for (const event of events) stampAndBuffer(event); + + const ac = new AbortController(); + const { handleSubscribeAssistantEvents } = + await import("../runtime/routes/events-routes.js"); + const stream = handleSubscribeAssistantEvents({ + queryParams: { + conversationKey: "reconnect-replay", + lastSeenSeq: "1", + }, + abortSignal: ac.signal, + }); + + const reader = stream.getReader(); + + // Frames 1-2 are the two events with seq > 1 (i.e. seqs 2 and 3), + // emitted before the initial heartbeat. + const frame1 = await readFrame(reader); + expect(frame1).toContain("event: assistant_event"); + expect(frame1).toContain('"seq":2'); + + const frame2 = await readFrame(reader); + expect(frame2).toContain('"seq":3'); + + // Then the heartbeat. + const heartbeat = await readFrame(reader); + expect(heartbeat).toBe(": heartbeat\n\n"); + + ac.abort(); + }); + + test("connects live without replay when lastSeenSeq is older than the ring's oldest entry", async () => { + // When the client's cursor is older than the ring can serve, the + // route deliberately does NOT signal anything special over the + // wire -- the connection just goes live. The client is expected to + // detect the gap from the seq jump on its first live event and + // refetch via the existing messages API. + const { conversationId } = getOrCreateConversation( + "reconnect-out-of-window", + ); + + // Push 202 events through stampAndBuffer so the ring's natural + // count-based eviction (cap 200) drops seqs 1 and 2. A cursor of + // 0 then falls outside what getReplayWindow can serve. + for (let i = 0; i < 202; i++) { + stampAndBuffer(buildAssistantEvent({ type: "pong" }, conversationId)); + } + const { _peekStreamForTesting } = await import( + "../runtime/conversation-stream-state.js" + ); + const peek = _peekStreamForTesting(conversationId); + expect(peek?.oldestSeq).toBe(3); + expect(peek?.newestSeq).toBe(202); + + const ac = new AbortController(); + const { handleSubscribeAssistantEvents } = await import( + "../runtime/routes/events-routes.js" + ); + const stream = handleSubscribeAssistantEvents({ + queryParams: { + conversationKey: "reconnect-out-of-window", + lastSeenSeq: "0", + }, + abortSignal: ac.signal, + }); + + const reader = stream.getReader(); + + // No replay events ahead of the heartbeat -- the cursor was + // unserviceable so the route emits nothing extra. + const heartbeat = await readFrame(reader); + expect(heartbeat).toBe(": heartbeat\n\n"); + + ac.abort(); + }); + + test("omitting lastSeenSeq skips replay entirely (legacy live-only behavior)", async () => { + const { conversationId } = getOrCreateConversation("reconnect-noparam"); + + // Pre-fill the ring -- without the cursor, these MUST NOT be replayed. + stampAndBuffer(buildAssistantEvent({ type: "pong" }, conversationId)); + stampAndBuffer(buildAssistantEvent({ type: "pong" }, conversationId)); + + const ac = new AbortController(); + const { handleSubscribeAssistantEvents } = + await import("../runtime/routes/events-routes.js"); + const stream = handleSubscribeAssistantEvents({ + queryParams: { conversationKey: "reconnect-noparam" }, + abortSignal: ac.signal, + }); + + const reader = stream.getReader(); + + // First frame is the heartbeat -- no replay events ahead of it. + const heartbeat = await readFrame(reader); + expect(heartbeat).toBe(": heartbeat\n\n"); + + ac.abort(); + }); + + test("dedupes a buffered event when it also races through the live callback", async () => { + const { conversationId } = getOrCreateConversation("reconnect-dedup"); + + // Stamp two events. Seqs start at 1, so eventA=1 and eventB=2. + const eventA = buildAssistantEvent({ type: "pong" }, conversationId); + const eventB = buildAssistantEvent({ type: "pong" }, conversationId); + stampAndBuffer(eventA); // seq 1 + stampAndBuffer(eventB); // seq 2 + + const ac = new AbortController(); + const { AssistantEventHub } = + await import("../runtime/assistant-event-hub.js"); + const testHub = new AssistantEventHub(); + const { handleSubscribeAssistantEvents } = + await import("../runtime/routes/events-routes.js"); + const stream = handleSubscribeAssistantEvents( + { + queryParams: { + conversationKey: "reconnect-dedup", + lastSeenSeq: "1", + }, + abortSignal: ac.signal, + }, + { hub: testHub }, + ); + + const reader = stream.getReader(); + + // First frame is the replay of seq=2 (the only one with seq > 1). + const replayed = await readFrame(reader); + expect(replayed).toContain('"seq":2'); + + // Then the heartbeat. + const heartbeat = await readFrame(reader); + expect(heartbeat).toBe(": heartbeat\n\n"); + + // Now publish eventB live (its seq is 2, already replayed). The + // callback's high-water dedup should drop it. Then publish a fresh + // event with seq=3 (via stampAndBuffer + manual publish), which + // SHOULD be delivered. + await testHub.publish(eventB); + + const eventC = buildAssistantEvent({ type: "pong" }, conversationId); + stampAndBuffer(eventC); // seq 3 + await testHub.publish(eventC); + + const liveFrame = await readFrame(reader); + expect(liveFrame).toContain('"seq":3'); + + ac.abort(); + }); + + test("rejects empty lastSeenSeq", async () => { + const { handleSubscribeAssistantEvents } = + await import("../runtime/routes/events-routes.js"); + expect(() => + handleSubscribeAssistantEvents({ + queryParams: { + conversationKey: "reconnect-empty", + lastSeenSeq: "", + }, + }), + ).toThrow(/lastSeenSeq must not be empty/); + }); + + test("rejects non-integer lastSeenSeq", async () => { + const { handleSubscribeAssistantEvents } = + await import("../runtime/routes/events-routes.js"); + expect(() => + handleSubscribeAssistantEvents({ + queryParams: { + conversationKey: "reconnect-float", + lastSeenSeq: "1.5", + }, + }), + ).toThrow(/non-negative integer/); + }); + + test("rejects negative lastSeenSeq", async () => { + const { handleSubscribeAssistantEvents } = + await import("../runtime/routes/events-routes.js"); + expect(() => + handleSubscribeAssistantEvents({ + queryParams: { + conversationKey: "reconnect-neg", + lastSeenSeq: "-1", + }, + }), + ).toThrow(/non-negative integer/); + }); +}); diff --git a/assistant/src/runtime/routes/events-routes.ts b/assistant/src/runtime/routes/events-routes.ts index 570dfce3d37..878bc4d72d4 100644 --- a/assistant/src/runtime/routes/events-routes.ts +++ b/assistant/src/runtime/routes/events-routes.ts @@ -40,6 +40,7 @@ import { assistantEventHub, } from "../assistant-event-hub.js"; import { ACTOR_PRINCIPALS, GATEWAY_PRINCIPALS } from "../auth/route-policy.js"; +import { getReplayWindow } from "../conversation-stream-state.js"; import { resolveActorPrincipalIdForLocalGuardian } from "../local-actor-identity.js"; import { BadRequestError, @@ -264,19 +265,32 @@ export function handleSubscribeAssistantEvents( const rawConversationId = queryParams?.conversationId; const rawConversationKey = queryParams?.conversationKey; - if ( - "conversationId" in (queryParams ?? {}) && - !rawConversationId?.trim() - ) { + const rawLastSeenSeq = queryParams?.lastSeenSeq; + if ("conversationId" in (queryParams ?? {}) && !rawConversationId?.trim()) { throw new BadRequestError("conversationId must not be empty"); } - if ( - "conversationKey" in (queryParams ?? {}) && - !rawConversationKey?.trim() - ) { + if ("conversationKey" in (queryParams ?? {}) && !rawConversationKey?.trim()) { throw new BadRequestError("conversationKey must not be empty"); } + // Parse the optional reconnect cursor. We accept any non-negative integer + // -- including 0, which is the natural cursor for a client that has not + // yet observed any event in this conversation but still wants its full + // ring buffer replayed (as opposed to omitting the param entirely, which + // means "no replay attempt, just connect live"). + let lastSeenSeq: number | null = null; + if (rawLastSeenSeq != null) { + const trimmed = rawLastSeenSeq.trim(); + if (trimmed === "") { + throw new BadRequestError("lastSeenSeq must not be empty"); + } + const parsed = Number(trimmed); + if (!Number.isInteger(parsed) || parsed < 0) { + throw new BadRequestError("lastSeenSeq must be a non-negative integer"); + } + lastSeenSeq = parsed; + } + // ── Client identity from headers ────────────────────────────────────── const rawClientId = headers?.["x-vellum-client-id"]; const rawInterfaceId = headers?.["x-vellum-interface-id"]; @@ -372,9 +386,24 @@ export function handleSubscribeAssistantEvents( } } + // Tracks the highest seq enqueued during the synchronous replay drain. + // Live events that race in with seq <= this watermark are dropped to + // avoid double-delivery -- broadcastMessage stamps and rings BEFORE + // calling publish, so any in-flight event mid-replay is already in the + // replay window we just drained. + let highWaterReplaySeq = -1; + const callback: AssistantEventCallback = (event) => { const controller = controllerRef; if (!controller) return; + if ( + event.seq != null && + highWaterReplaySeq >= 0 && + event.seq <= highWaterReplaySeq + ) { + // Already delivered via replay; skip the duplicate. + return; + } try { if (controller.desiredSize != null && controller.desiredSize <= 0) { shedReporter("callback_backpressure", instrumentation); @@ -432,6 +461,28 @@ export function handleSubscribeAssistantEvents( return; } + // Reconnect replay: when the caller passed lastSeenSeq and the + // subscription is scoped to a single conversation, deliver any + // buffered events the client missed before the first heartbeat. + // + // If the cursor is older than the ring's oldest entry, + // `getReplayWindow` returns `null`. We do not surface that to + // the client over the wire -- the connection just goes live. + // The client detects the gap from the seq jump on its first + // live event and refetches via the existing messages API. + if (lastSeenSeq != null && filter.conversationId) { + const window = getReplayWindow(filter.conversationId, lastSeenSeq); + if (window !== null) { + for (const replayed of window) { + controller.enqueue(encoder.encode(formatSseFrame(replayed))); + instrumentation.eventsDelivered += 1; + if (replayed.seq != null && replayed.seq > highWaterReplaySeq) { + highWaterReplaySeq = replayed.seq; + } + } + } + } + controller.enqueue(encoder.encode(formatSseHeartbeat())); instrumentation.heartbeatsSent += 1; @@ -527,6 +578,11 @@ export const ROUTES: RouteDefinition[] = [ description: "Scope to a single conversation by an external key (non-vellum channels) or the web idempotency key. Materializes a row on first use. Ignored when conversationId is also provided.", }, + { + name: "lastSeenSeq", + description: + "Optional reconnect cursor: the highest per-conversation event seq the client has already applied. When set together with a conversation scope, the daemon replays any buffered events with seq > lastSeenSeq before going live. If the cursor is older than the ring buffer's oldest entry the connection simply goes live; the client is expected to detect the gap from the next event's seq and refetch via the messages API. Must be a non-negative integer.", + }, ], responseHeaders: { "Content-Type": "text/event-stream",