diff --git a/assistant/src/__tests__/assistant-event.test.ts b/assistant/src/__tests__/assistant-event.test.ts index efda0025b4e..5cbb6838c25 100644 --- a/assistant/src/__tests__/assistant-event.test.ts +++ b/assistant/src/__tests__/assistant-event.test.ts @@ -111,6 +111,21 @@ describe("formatSseFrame", () => { const frame = formatSseFrame(baseEvent); expect(frame.endsWith("\n\n")).toBe(true); }); + + test("seq lands in the JSON payload (envelope) when set, never in SSE id", () => { + const event: AssistantEvent = { + ...baseEvent, + id: "uuid-event-id", + seq: 42, + }; + const frame = formatSseFrame(event); + // SSE wire id is always the UUID -- replay cursor is decoupled from + // the SSE protocol and lives on the envelope only. + const idLine = frame.split("\n").find((l) => l.startsWith("id: "))!; + expect(idLine).toBe("id: uuid-event-id"); + // seq is on the envelope; consumers read it from the JSON payload. + expect(frame).toContain('"seq":42'); + }); }); // ── Heartbeat tests ─────────────────────────────────────────────────────────── diff --git a/assistant/src/__tests__/conversation-stream-state.test.ts b/assistant/src/__tests__/conversation-stream-state.test.ts new file mode 100644 index 00000000000..951047cdf3a --- /dev/null +++ b/assistant/src/__tests__/conversation-stream-state.test.ts @@ -0,0 +1,264 @@ +import { beforeEach, describe, expect, test } from "bun:test"; + +import type { AssistantEvent } from "../runtime/assistant-event.js"; +import { + _peekStreamForTesting, + _resetConversationStreamsForTesting, + clearConversationStream, + getReplayWindow, + stampAndBuffer, +} from "../runtime/conversation-stream-state.js"; + +const CONV = "conv_test"; + +function mkEvent(overrides: Partial = {}): AssistantEvent { + return { + id: `uuid-${Math.random().toString(36).slice(2, 10)}`, + conversationId: CONV, + emittedAt: new Date().toISOString(), + message: { type: "assistant_text_delta", conversationId: CONV, text: "x" }, + ...overrides, + } as AssistantEvent; +} + +describe("conversation-stream-state", () => { + beforeEach(() => { + _resetConversationStreamsForTesting(); + }); + + describe("stampAndBuffer", () => { + test("assigns monotonic seq starting at 1 per conversation", () => { + const a = mkEvent(); + const b = mkEvent(); + const c = mkEvent(); + stampAndBuffer(a); + stampAndBuffer(b); + stampAndBuffer(c); + expect(a.seq).toBe(1); + expect(b.seq).toBe(2); + expect(c.seq).toBe(3); + }); + + test("seq is per-conversation, not global", () => { + const a = mkEvent({ conversationId: "conv_a" }); + const b = mkEvent({ conversationId: "conv_b" }); + const a2 = mkEvent({ conversationId: "conv_a" }); + stampAndBuffer(a); + stampAndBuffer(b); + stampAndBuffer(a2); + expect(a.seq).toBe(1); + expect(b.seq).toBe(1); // independent counter + expect(a2.seq).toBe(2); + }); + + test("no-op when conversationId is absent (unscoped broadcasts)", () => { + const event = mkEvent({ conversationId: undefined }); + stampAndBuffer(event); + expect(event.seq).toBeUndefined(); + }); + + test("pushes event onto ring buffer", () => { + stampAndBuffer(mkEvent()); + stampAndBuffer(mkEvent()); + const peek = _peekStreamForTesting(CONV); + expect(peek?.ringLength).toBe(2); + expect(peek?.oldestSeq).toBe(1); + expect(peek?.newestSeq).toBe(2); + }); + + test("replayable: false stamps seq but skips ring push", () => { + const targeted = mkEvent(); + stampAndBuffer(targeted, { replayable: false }); + expect(targeted.seq).toBe(1); // still stamped for wire-side ordering + const peek = _peekStreamForTesting(CONV); + // State exists (we created it to assign seq) but ring is empty. + expect(peek?.nextSeq).toBe(2); + expect(peek?.ringLength).toBe(0); + }); + + test("seq stays monotonic across mixed replayable/non-replayable events", () => { + const a = mkEvent(); + const b = mkEvent(); + const c = mkEvent(); + const d = mkEvent(); + stampAndBuffer(a); // replayable + stampAndBuffer(b, { replayable: false }); // targeted -- skipped + stampAndBuffer(c); // replayable + stampAndBuffer(d, { replayable: false }); // targeted -- skipped + expect([a.seq, b.seq, c.seq, d.seq]).toEqual([1, 2, 3, 4]); + const peek = _peekStreamForTesting(CONV); + expect(peek?.ringLength).toBe(2); // only a + c in buffer + expect(peek?.oldestSeq).toBe(1); + expect(peek?.newestSeq).toBe(3); + }); + }); + + describe("ring buffer eviction", () => { + test("evicts oldest entries past the 200-event count cap", () => { + for (let i = 0; i < 250; i++) stampAndBuffer(mkEvent()); + const peek = _peekStreamForTesting(CONV); + expect(peek?.ringLength).toBe(200); + // Newest is 250, oldest should be 51 (250 - 200 + 1) + expect(peek?.newestSeq).toBe(250); + expect(peek?.oldestSeq).toBe(51); + }); + + test("evicts past the 256 KB size cap", () => { + // Each event with a large text payload pushes past the limit fast. + const bigText = "x".repeat(8 * 1024); // 8 KB per event + for (let i = 0; i < 60; i++) { + stampAndBuffer( + mkEvent({ + message: { + type: "assistant_text_delta", + conversationId: CONV, + text: bigText, + }, + }), + ); + } + const peek = _peekStreamForTesting(CONV); + expect(peek).not.toBeNull(); + // 60 * ~8KB = ~480KB pushed; ring must have evicted down under 256KB. + expect(peek!.totalSizeBytes).toBeLessThanOrEqual(256 * 1024); + expect(peek!.ringLength).toBeLessThan(60); + }); + + test("evicts past the 30s age cap", async () => { + const originalNow = Date.now; + let fakeNow = 1_000_000; + Date.now = () => fakeNow; + try { + stampAndBuffer(mkEvent()); // emittedAt = 1_000_000 + fakeNow = 1_000_000 + 10_000; + stampAndBuffer(mkEvent()); // emittedAt = 1_010_000 + + // Jump 31s past the first event but keep within window of second. + fakeNow = 1_000_000 + 31_000; + stampAndBuffer(mkEvent()); // triggers eviction sweep on push + + const peek = _peekStreamForTesting(CONV); + // First event is now > 30s old → evicted. Second + third remain. + expect(peek?.ringLength).toBe(2); + expect(peek?.oldestSeq).toBe(2); + expect(peek?.newestSeq).toBe(3); + } finally { + Date.now = originalNow; + } + }); + }); + + describe("getReplayWindow", () => { + test("returns events with seq > lastSeenSeq in order", () => { + const events = Array.from({ length: 5 }, () => mkEvent()); + events.forEach((e) => stampAndBuffer(e)); + const replay = getReplayWindow(CONV, 2); + expect(replay).not.toBeNull(); + expect(replay!.map((e) => e.seq)).toEqual([3, 4, 5]); + }); + + test("returns empty array when lastSeenSeq is current (nothing to replay)", () => { + stampAndBuffer(mkEvent()); + stampAndBuffer(mkEvent()); + const replay = getReplayWindow(CONV, 2); + expect(replay).toEqual([]); + }); + + test("returns null when lastSeenSeq is older than oldest buffered entry", () => { + // Force eviction by pushing past the count cap. + for (let i = 0; i < 250; i++) stampAndBuffer(mkEvent()); + const peek = _peekStreamForTesting(CONV); + expect(peek?.oldestSeq).toBe(51); + // Client claims to have last seen seq=10 — that's far below oldest. + const replay = getReplayWindow(CONV, 10); + expect(replay).toBeNull(); + }); + + test("returns empty array for a conversation with no stream state", () => { + const replay = getReplayWindow("conv_never_streamed", 0); + expect(replay).toEqual([]); + }); + + test("lastSeenSeq exactly one below oldest is a valid replay (no snapshot needed)", () => { + stampAndBuffer(mkEvent()); // seq 1 + stampAndBuffer(mkEvent()); // seq 2 + stampAndBuffer(mkEvent()); // seq 3 + // Client saw nothing → lastSeenSeq=0, oldest=1, replay [1,2,3]. + const replay = getReplayWindow(CONV, 0); + expect(replay).not.toBeNull(); + expect(replay!.map((e) => e.seq)).toEqual([1, 2, 3]); + }); + + test("evicts age-expired entries at read time on idle stream", () => { + const originalNow = Date.now; + let fakeNow = 5_000_000; + Date.now = () => fakeNow; + try { + stampAndBuffer(mkEvent()); // seq 1, emitted at 5_000_000 + stampAndBuffer(mkEvent()); // seq 2, emitted at 5_000_000 + + // No further stampAndBuffer calls. Stream goes idle. Advance + // clock past the 30s age cap. + fakeNow = 5_000_000 + 60_000; + + // Eviction has not run since the last write -- the buffer still + // physically holds [1, 2]. getReplayWindow must sweep first. + const replay = getReplayWindow(CONV, 0); + + // Both events were past their TTL, so eviction drains the ring + // and the call returns [] (no replay possible, no snapshot + // needed either -- client claims they saw nothing and there is + // nothing left). + expect(replay).toEqual([]); + // State entry is dropped after the drain. + expect(_peekStreamForTesting(CONV)).toBeNull(); + } finally { + Date.now = originalNow; + } + }); + + test("read-time eviction preserves the snapshot fallback signal", () => { + const originalNow = Date.now; + let fakeNow = 6_000_000; + Date.now = () => fakeNow; + try { + stampAndBuffer(mkEvent()); // seq 1 + stampAndBuffer(mkEvent()); // seq 2 + + // 40s pass. Both entries are over the age cap. + fakeNow = 6_000_000 + 40_000; + + // Now a fresh event lands -- ring contains only seq 3. + stampAndBuffer(mkEvent()); // seq 3 + // After this write, evict() already ran and dropped the stale + // entries from the write path. Verify that. + const peek = _peekStreamForTesting(CONV); + expect(peek?.ringLength).toBe(1); + expect(peek?.oldestSeq).toBe(3); + + // Client reconnects claiming lastSeenSeq=1. Oldest buffered is + // 3, so 1 < 3 - 1 = 2 -> snapshot fallback (null). + const replay = getReplayWindow(CONV, 1); + expect(replay).toBeNull(); + } finally { + Date.now = originalNow; + } + }); + }); + + describe("clearConversationStream", () => { + test("drops all state for the conversation", () => { + stampAndBuffer(mkEvent()); + stampAndBuffer(mkEvent()); + expect(_peekStreamForTesting(CONV)).not.toBeNull(); + + clearConversationStream(CONV); + + expect(_peekStreamForTesting(CONV)).toBeNull(); + // Subsequent emit starts seq fresh at 1. + const event = mkEvent(); + stampAndBuffer(event); + expect(event.seq).toBe(1); + }); + }); +}); diff --git a/assistant/src/runtime/assistant-event-hub.ts b/assistant/src/runtime/assistant-event-hub.ts index 90153d1d1b4..3918d76b7d6 100644 --- a/assistant/src/runtime/assistant-event-hub.ts +++ b/assistant/src/runtime/assistant-event-hub.ts @@ -43,6 +43,7 @@ import { appendEventToStream } from "../signals/event-stream.js"; import { getLogger } from "../util/logger.js"; import type { AssistantEvent } from "./assistant-event.js"; import { buildAssistantEvent } from "./assistant-event.js"; +import { stampAndBuffer } from "./conversation-stream-state.js"; const log = getLogger("assistant-event-hub"); @@ -604,6 +605,11 @@ export function broadcastMessage( excludeClientId, } : undefined; + // Stamp per-conversation seq and (when replayable) push onto the ring + // buffer. Mutates `event.seq` in place. Targeted/exclusion publishes + // are stamped but not buffered -- replay by `conversationId` alone + // would leak them to subscribers outside their intended delivery set. + stampAndBuffer(event, { replayable: publishOptions == null }); _hubChain = _hubChain .then(() => assistantEventHub.publish(event, publishOptions)) .then(() => { @@ -658,7 +664,6 @@ function resolveCanonicalRequestSourceType( return "channel"; } - /** * Lazily load heavy dependencies and create a canonical guardian request + * bridge for a confirmation_request message. Called fire-and-forget from @@ -732,4 +737,3 @@ async function createCanonicalRequestForConfirmation( ); } } - diff --git a/assistant/src/runtime/conversation-stream-state.ts b/assistant/src/runtime/conversation-stream-state.ts new file mode 100644 index 00000000000..b2579ed0d79 --- /dev/null +++ b/assistant/src/runtime/conversation-stream-state.ts @@ -0,0 +1,187 @@ +/** + * Conversation Stream State -- per-conversation SSE sequence counter and + * ring buffer for `Last-Event-ID` replay (B7 Unit 1). + * + * Every conversation-scoped outbound event picks up a monotonic `seq` + * number from this module. The same event is also pushed onto a bounded + * ring buffer so a reconnecting client can request replay of events the + * daemon emitted while it was disconnected. + * + * Bounds (oldest evicted first; first bound hit wins): + * - Count: 200 events + * - Total size: 256 KB + * - Age: 30 seconds + * + * The ring is in-memory and per-daemon-process. After a daemon restart + * all seqs reset and reconnecting clients fall through to the snapshot + * path (delivered by B7 Unit 2). The ring is sized generously enough + * that a typical refresh round-trip (~1-3s) is well within window. + */ + +import type { AssistantEvent } from "./assistant-event.js"; + +// ── Tunables ───────────────────────────────────────────────────────── + +const RING_COUNT_LIMIT = 200; +const RING_SIZE_LIMIT_BYTES = 256 * 1024; +const RING_AGE_LIMIT_MS = 30_000; + +// ── Types ──────────────────────────────────────────────────────────── + +interface RingEntry { + seq: number; + event: AssistantEvent; + emittedAt: number; + sizeBytes: number; +} + +interface ConversationStreamState { + nextSeq: number; + ring: RingEntry[]; + totalSizeBytes: number; +} + +// ── State ──────────────────────────────────────────────────────────── + +const streams = new Map(); + +function getOrCreate(conversationId: string): ConversationStreamState { + let state = streams.get(conversationId); + if (!state) { + state = { nextSeq: 1, ring: [], totalSizeBytes: 0 }; + streams.set(conversationId, state); + } + return state; +} + +// ── Public API ─────────────────────────────────────────────────────── + +/** + * Assign a monotonic `seq` to a conversation-scoped event and (when + * replayable) push it onto the ring buffer. No-op when + * `event.conversationId` is absent (unscoped broadcasts are never + * replayable). + * + * `options.replayable` should be `false` whenever the event was + * published with any targeting / exclusion modifier + * (`targetCapability`, `targetClientId`, `targetInterfaceId`, + * `excludeClientId`). Such events have a narrower delivery set than the + * conversation subscriber list, so storing them by `conversationId` + * alone would leak them to the wrong subscribers on replay. The seq is + * still stamped so the wire-side ordering stays contiguous; only the + * ring push is skipped. Defaults to `true`. + * + * Mutates `event.seq` in place. + */ +export function stampAndBuffer( + event: AssistantEvent, + options?: { replayable?: boolean }, +): void { + const cid = event.conversationId; + if (cid == null) return; + + const state = getOrCreate(cid); + event.seq = state.nextSeq++; + + if (options?.replayable === false) return; + + // Approximate size by serialized JSON length. This is the same + // bytes-on-wire we'll send, so it tracks ring memory pressure + // closely without a separate measurement pass. + const sizeBytes = JSON.stringify(event).length; + state.ring.push({ seq: event.seq, event, emittedAt: Date.now(), sizeBytes }); + state.totalSizeBytes += sizeBytes; + + evict(state); +} + +/** + * Replay events with `seq > lastSeenSeq` for a given conversation. + * Returns `null` when the requested cursor is older than the oldest + * buffered entry -- callers should fall back to a snapshot resync. + * + * Sweeps age-expired entries at read time so an idle conversation + * cannot serve stale deltas past the 30-second window (eviction + * only runs on `stampAndBuffer`, so without this an idle stream + * would retain its tail until the next write). When the sweep + * drains the ring entirely, the conversation's state entry is + * dropped to keep the global map from growing unboundedly with + * inactive conversations. + */ +export function getReplayWindow( + conversationId: string, + lastSeenSeq: number, +): readonly AssistantEvent[] | null { + const state = streams.get(conversationId); + if (!state) return []; + + evict(state); + + if (state.ring.length === 0) { + streams.delete(conversationId); + return []; + } + + const oldest = state.ring[0]?.seq ?? Infinity; + if (lastSeenSeq < oldest - 1) return null; + + return state.ring + .filter((entry) => entry.seq > lastSeenSeq) + .map((entry) => entry.event); +} + +/** + * Drop all state for a conversation. Currently unused -- the ring + * self-evicts by age -- but exposed for explicit dispose flows + * (e.g. when a conversation is deleted). + */ +export function clearConversationStream(conversationId: string): void { + streams.delete(conversationId); +} + +/** + * Reset all stream state. Test-only. + */ +export function _resetConversationStreamsForTesting(): void { + streams.clear(); +} + +/** + * Read-only inspector for tests. + */ +export function _peekStreamForTesting(conversationId: string): { + nextSeq: number; + ringLength: number; + totalSizeBytes: number; + oldestSeq: number | null; + newestSeq: number | null; +} | null { + const state = streams.get(conversationId); + if (!state) return null; + return { + nextSeq: state.nextSeq, + ringLength: state.ring.length, + totalSizeBytes: state.totalSizeBytes, + oldestSeq: state.ring[0]?.seq ?? null, + newestSeq: state.ring[state.ring.length - 1]?.seq ?? null, + }; +} + +// ── Internals ──────────────────────────────────────────────────────── + +function evict(state: ConversationStreamState): void { + const now = Date.now(); + while (state.ring.length > 0) { + const head = state.ring[0]; + if (head == null) break; + + const overCount = state.ring.length > RING_COUNT_LIMIT; + const overSize = state.totalSizeBytes > RING_SIZE_LIMIT_BYTES; + const overAge = now - head.emittedAt > RING_AGE_LIMIT_MS; + + if (!overCount && !overSize && !overAge) break; + + state.ring.shift(); + state.totalSizeBytes -= head.sizeBytes; + } +} diff --git a/packages/skill-host-contracts/src/assistant-event.ts b/packages/skill-host-contracts/src/assistant-event.ts index d718fefa041..e8d4e6d382c 100644 --- a/packages/skill-host-contracts/src/assistant-event.ts +++ b/packages/skill-host-contracts/src/assistant-event.ts @@ -30,6 +30,13 @@ export interface AssistantEvent { id: string; /** Resolved conversation id when available. */ conversationId?: string; + /** + * Monotonic per-conversation sequence number. Assigned by the daemon at + * publish time for conversation-scoped events; absent for unscoped + * broadcasts. Clients track the highest observed `seq` per conversation + * and pass it back on reconnect to request replay of missed events. + */ + seq?: number; /** ISO-8601 timestamp of when the event was emitted. */ emittedAt: string; /** Outbound message payload. */ @@ -70,6 +77,10 @@ export function buildAssistantEvent( * data: \n * \n * ``` + * + * The SSE `id:` line is the per-event UUID and is intentionally decoupled + * from any replay cursor. Replay-aware consumers read `seq` from the JSON + * payload of the envelope itself. */ export function formatSseFrame(event: AssistantEvent): string { const sanitizedId = event.id.replace(/[\n\r]/g, "");