Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions assistant/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9060,6 +9060,17 @@ paths:
description:
Scope to a single conversation by an external key (non-vellum channels) or the web idempotency key.
Materializes a row on first use. Ignored when conversationId is also provided.
- name: lastSeenSeq
in: query
required: false
schema:
type: string
description:
"Optional reconnect cursor: the highest per-conversation event seq the client has already applied. When set
together with a conversation scope, the daemon replays any buffered events with seq > lastSeenSeq before
going live. If the cursor is older than the ring buffer's oldest entry the connection simply goes live; the
client is expected to detect the gap from the next event's seq and refetch via the messages API. Must be a
non-negative integer."
/v1/events/emit:
post:
operationId: events_emit_post
Expand Down
267 changes: 267 additions & 0 deletions assistant/src/__tests__/runtime-events-sse-reconnect.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
/**
* HTTP-layer tests for the B7.2 reconnect handler on GET /v1/events.
*
* Covers:
* - replay of buffered events when `lastSeenSeq` is in-window
* - cursor older than the ring -> connection goes live without any
* extra wire signal (client is expected to detect the seq jump
* and refetch via the messages API)
* - omitted `lastSeenSeq` falls through to legacy live-only behavior
* - dedup against live events that race in mid-replay
* - malformed `lastSeenSeq` query param rejected with 400
*/
import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test";

mock.module("../util/logger.js", () => ({
getLogger: () =>
new Proxy({} as Record<string, unknown>, {
get: () => () => {},
}),
}));

mock.module("../config/loader.js", () => ({
getConfig: () => ({
ui: {},
model: "test",
provider: "test",
memory: { enabled: false },
rateLimit: { maxRequestsPerMinute: 0 },
secretDetection: { enabled: false },
}),
}));

import { getOrCreateConversation } from "../memory/conversation-key-store.js";
import { getDb } from "../memory/db-connection.js";
import { initializeDb } from "../memory/db-init.js";
import { buildAssistantEvent } from "../runtime/assistant-event.js";
import {
_resetConversationStreamsForTesting,
stampAndBuffer,
} from "../runtime/conversation-stream-state.js";

initializeDb();

const decoder = new TextDecoder();

async function readFrame(
reader: ReadableStreamDefaultReader<Uint8Array>,
): Promise<string> {
const { value, done } = await reader.read();
expect(done).toBe(false);
return decoder.decode(value);
}

describe("SSE reconnect replay (B7.2)", () => {
beforeEach(() => {
const db = getDb();
db.run("DELETE FROM conversation_keys");
db.run("DELETE FROM conversations");
_resetConversationStreamsForTesting();
});

afterEach(() => {
_resetConversationStreamsForTesting();
});

test("replays buffered events with seq > lastSeenSeq before the first heartbeat", async () => {
const { conversationId } = getOrCreateConversation("reconnect-replay");

// Stamp three events into the ring as if they had already been
// broadcast prior to the client's reconnect. Seqs start at 1, so
// these get seqs 1, 2, 3.
const events = [
buildAssistantEvent({ type: "pong" }, conversationId),
buildAssistantEvent({ type: "pong" }, conversationId),
buildAssistantEvent({ type: "pong" }, conversationId),
];
for (const event of events) stampAndBuffer(event);

const ac = new AbortController();
const { handleSubscribeAssistantEvents } =
await import("../runtime/routes/events-routes.js");
const stream = handleSubscribeAssistantEvents({
queryParams: {
conversationKey: "reconnect-replay",
lastSeenSeq: "1",
},
abortSignal: ac.signal,
});

const reader = stream.getReader();

// Frames 1-2 are the two events with seq > 1 (i.e. seqs 2 and 3),
// emitted before the initial heartbeat.
const frame1 = await readFrame(reader);
expect(frame1).toContain("event: assistant_event");
expect(frame1).toContain('"seq":2');

const frame2 = await readFrame(reader);
expect(frame2).toContain('"seq":3');

// Then the heartbeat.
const heartbeat = await readFrame(reader);
expect(heartbeat).toBe(": heartbeat\n\n");

ac.abort();
});

test("connects live without replay when lastSeenSeq is older than the ring's oldest entry", async () => {
// When the client's cursor is older than the ring can serve, the
// route deliberately does NOT signal anything special over the
// wire -- the connection just goes live. The client is expected to
// detect the gap from the seq jump on its first live event and
// refetch via the existing messages API.
const { conversationId } = getOrCreateConversation(
"reconnect-out-of-window",
);

// Push 202 events through stampAndBuffer so the ring's natural
// count-based eviction (cap 200) drops seqs 1 and 2. A cursor of
// 0 then falls outside what getReplayWindow can serve.
for (let i = 0; i < 202; i++) {
stampAndBuffer(buildAssistantEvent({ type: "pong" }, conversationId));
}
const { _peekStreamForTesting } = await import(
"../runtime/conversation-stream-state.js"
);
const peek = _peekStreamForTesting(conversationId);
expect(peek?.oldestSeq).toBe(3);
expect(peek?.newestSeq).toBe(202);

const ac = new AbortController();
const { handleSubscribeAssistantEvents } = await import(
"../runtime/routes/events-routes.js"
);
const stream = handleSubscribeAssistantEvents({
queryParams: {
conversationKey: "reconnect-out-of-window",
lastSeenSeq: "0",
},
abortSignal: ac.signal,
});

const reader = stream.getReader();

// No replay events ahead of the heartbeat -- the cursor was
// unserviceable so the route emits nothing extra.
const heartbeat = await readFrame(reader);
expect(heartbeat).toBe(": heartbeat\n\n");

ac.abort();
});

test("omitting lastSeenSeq skips replay entirely (legacy live-only behavior)", async () => {
const { conversationId } = getOrCreateConversation("reconnect-noparam");

// Pre-fill the ring -- without the cursor, these MUST NOT be replayed.
stampAndBuffer(buildAssistantEvent({ type: "pong" }, conversationId));
stampAndBuffer(buildAssistantEvent({ type: "pong" }, conversationId));

const ac = new AbortController();
const { handleSubscribeAssistantEvents } =
await import("../runtime/routes/events-routes.js");
const stream = handleSubscribeAssistantEvents({
queryParams: { conversationKey: "reconnect-noparam" },
abortSignal: ac.signal,
});

const reader = stream.getReader();

// First frame is the heartbeat -- no replay events ahead of it.
const heartbeat = await readFrame(reader);
expect(heartbeat).toBe(": heartbeat\n\n");

ac.abort();
});

test("dedupes a buffered event when it also races through the live callback", async () => {
const { conversationId } = getOrCreateConversation("reconnect-dedup");

// Stamp two events. Seqs start at 1, so eventA=1 and eventB=2.
const eventA = buildAssistantEvent({ type: "pong" }, conversationId);
const eventB = buildAssistantEvent({ type: "pong" }, conversationId);
stampAndBuffer(eventA); // seq 1
stampAndBuffer(eventB); // seq 2

const ac = new AbortController();
const { AssistantEventHub } =
await import("../runtime/assistant-event-hub.js");
const testHub = new AssistantEventHub();
const { handleSubscribeAssistantEvents } =
await import("../runtime/routes/events-routes.js");
const stream = handleSubscribeAssistantEvents(
{
queryParams: {
conversationKey: "reconnect-dedup",
lastSeenSeq: "1",
},
abortSignal: ac.signal,
},
{ hub: testHub },
);

const reader = stream.getReader();

// First frame is the replay of seq=2 (the only one with seq > 1).
const replayed = await readFrame(reader);
expect(replayed).toContain('"seq":2');

// Then the heartbeat.
const heartbeat = await readFrame(reader);
expect(heartbeat).toBe(": heartbeat\n\n");

// Now publish eventB live (its seq is 2, already replayed). The
// callback's high-water dedup should drop it. Then publish a fresh
// event with seq=3 (via stampAndBuffer + manual publish), which
// SHOULD be delivered.
await testHub.publish(eventB);

const eventC = buildAssistantEvent({ type: "pong" }, conversationId);
stampAndBuffer(eventC); // seq 3
await testHub.publish(eventC);

const liveFrame = await readFrame(reader);
expect(liveFrame).toContain('"seq":3');

ac.abort();
});

test("rejects empty lastSeenSeq", async () => {
const { handleSubscribeAssistantEvents } =
await import("../runtime/routes/events-routes.js");
expect(() =>
handleSubscribeAssistantEvents({
queryParams: {
conversationKey: "reconnect-empty",
lastSeenSeq: "",
},
}),
).toThrow(/lastSeenSeq must not be empty/);
});

test("rejects non-integer lastSeenSeq", async () => {
const { handleSubscribeAssistantEvents } =
await import("../runtime/routes/events-routes.js");
expect(() =>
handleSubscribeAssistantEvents({
queryParams: {
conversationKey: "reconnect-float",
lastSeenSeq: "1.5",
},
}),
).toThrow(/non-negative integer/);
});

test("rejects negative lastSeenSeq", async () => {
const { handleSubscribeAssistantEvents } =
await import("../runtime/routes/events-routes.js");
expect(() =>
handleSubscribeAssistantEvents({
queryParams: {
conversationKey: "reconnect-neg",
lastSeenSeq: "-1",
},
}),
).toThrow(/non-negative integer/);
});
});
Loading
Loading