From 141cca34df621af05db39dee1a73de873fce2cb7 Mon Sep 17 00:00:00 2001 From: Apollo Bot Date: Sat, 30 May 2026 17:15:39 +0000 Subject: [PATCH 1/3] feat(sse): per-conversation seq + ring buffer for Last-Event-ID replay (B7.1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Foundation for resumable SSE on mid-turn refresh. Every conversation-scoped outbound event picks up a monotonic `seq` from a per-conversation counter at broadcast time and is pushed onto a bounded ring buffer (200 events, 256 KB total, 30s age). Eviction is oldest-first; first bound hit wins. The SSE wire `id:` field now carries seq when present, so reconnecting clients send `Last-Event-ID: ` per the EventSource spec. The UUID `event.id` stays in the JSON payload for per-event uniqueness. Unscoped broadcasts (e.g. `conversation_list_invalidated`) fall back to the UUID and are not replayable by design. This is ship unit 1 of 3. Additive only — no client-visible behavior change without B7.2 (reconnect handler) and B7.3 (client `lastSeenSeq`). `getReplayWindow(conversationId, lastSeenSeq)` is exposed for B7.2's reconnect path. Returns `null` when the cursor is older than the oldest buffered entry, signaling the caller to fall back to a snapshot resync. Tests: - 13 unit tests on conversation-stream-state (monotonic seq, per-conversation scope, no-op on unscoped, count/size/age eviction, replay window, snapshot fallback, clear). - 2 new formatSseFrame tests covering seq → SSE id wiring and UUID fallback. - Verified no regressions in assistant-event, hub, SSE-adjacent, and daemon-assistant-events test suites. --- .../src/__tests__/assistant-event.test.ts | 22 +++ .../conversation-stream-state.test.ts | 182 ++++++++++++++++++ assistant/src/runtime/assistant-event-hub.ts | 7 +- .../src/runtime/conversation-stream-state.ts | 160 +++++++++++++++ .../src/assistant-event.ts | 17 +- 5 files changed, 384 insertions(+), 4 deletions(-) create mode 100644 assistant/src/__tests__/conversation-stream-state.test.ts create mode 100644 assistant/src/runtime/conversation-stream-state.ts diff --git a/assistant/src/__tests__/assistant-event.test.ts b/assistant/src/__tests__/assistant-event.test.ts index efda0025b4e..4848e55d648 100644 --- a/assistant/src/__tests__/assistant-event.test.ts +++ b/assistant/src/__tests__/assistant-event.test.ts @@ -111,6 +111,28 @@ describe("formatSseFrame", () => { const frame = formatSseFrame(baseEvent); expect(frame.endsWith("\n\n")).toBe(true); }); + + test("uses event.seq as the SSE id when set (B7 Last-Event-ID)", () => { + const event: AssistantEvent = { + ...baseEvent, + id: "uuid-fallback", + seq: 42, + }; + const frame = formatSseFrame(event); + const idLine = frame.split("\n").find((l) => l.startsWith("id: "))!; + expect(idLine).toBe("id: 42"); + // UUID stays in the JSON payload for per-event uniqueness. + expect(frame).toContain('"id":"uuid-fallback"'); + expect(frame).toContain('"seq":42'); + }); + + test("falls back to event.id when seq is absent (unscoped broadcasts)", () => { + const event: AssistantEvent = { ...baseEvent, id: "uuid-only" }; + expect(event.seq).toBeUndefined(); + const frame = formatSseFrame(event); + const idLine = frame.split("\n").find((l) => l.startsWith("id: "))!; + expect(idLine).toBe("id: uuid-only"); + }); }); // ── Heartbeat tests ─────────────────────────────────────────────────────────── diff --git a/assistant/src/__tests__/conversation-stream-state.test.ts b/assistant/src/__tests__/conversation-stream-state.test.ts new file mode 100644 index 00000000000..e679540c293 --- /dev/null +++ b/assistant/src/__tests__/conversation-stream-state.test.ts @@ -0,0 +1,182 @@ +import { beforeEach, describe, expect, test } from "bun:test"; + +import type { AssistantEvent } from "../runtime/assistant-event.js"; +import { + _peekStreamForTesting, + _resetConversationStreamsForTesting, + clearConversationStream, + getReplayWindow, + stampAndBuffer, +} from "../runtime/conversation-stream-state.js"; + +const CONV = "conv_test"; + +function mkEvent(overrides: Partial = {}): AssistantEvent { + return { + id: `uuid-${Math.random().toString(36).slice(2, 10)}`, + conversationId: CONV, + emittedAt: new Date().toISOString(), + message: { type: "assistant_text_delta", conversationId: CONV, text: "x" }, + ...overrides, + } as AssistantEvent; +} + +describe("conversation-stream-state", () => { + beforeEach(() => { + _resetConversationStreamsForTesting(); + }); + + describe("stampAndBuffer", () => { + test("assigns monotonic seq starting at 1 per conversation", () => { + const a = mkEvent(); + const b = mkEvent(); + const c = mkEvent(); + stampAndBuffer(a); + stampAndBuffer(b); + stampAndBuffer(c); + expect(a.seq).toBe(1); + expect(b.seq).toBe(2); + expect(c.seq).toBe(3); + }); + + test("seq is per-conversation, not global", () => { + const a = mkEvent({ conversationId: "conv_a" }); + const b = mkEvent({ conversationId: "conv_b" }); + const a2 = mkEvent({ conversationId: "conv_a" }); + stampAndBuffer(a); + stampAndBuffer(b); + stampAndBuffer(a2); + expect(a.seq).toBe(1); + expect(b.seq).toBe(1); // independent counter + expect(a2.seq).toBe(2); + }); + + test("no-op when conversationId is absent (unscoped broadcasts)", () => { + const event = mkEvent({ conversationId: undefined }); + stampAndBuffer(event); + expect(event.seq).toBeUndefined(); + }); + + test("pushes event onto ring buffer", () => { + stampAndBuffer(mkEvent()); + stampAndBuffer(mkEvent()); + const peek = _peekStreamForTesting(CONV); + expect(peek?.ringLength).toBe(2); + expect(peek?.oldestSeq).toBe(1); + expect(peek?.newestSeq).toBe(2); + }); + }); + + describe("ring buffer eviction", () => { + test("evicts oldest entries past the 200-event count cap", () => { + for (let i = 0; i < 250; i++) stampAndBuffer(mkEvent()); + const peek = _peekStreamForTesting(CONV); + expect(peek?.ringLength).toBe(200); + // Newest is 250, oldest should be 51 (250 - 200 + 1) + expect(peek?.newestSeq).toBe(250); + expect(peek?.oldestSeq).toBe(51); + }); + + test("evicts past the 256 KB size cap", () => { + // Each event with a large text payload pushes past the limit fast. + const bigText = "x".repeat(8 * 1024); // 8 KB per event + for (let i = 0; i < 60; i++) { + stampAndBuffer( + mkEvent({ + message: { + type: "assistant_text_delta", + conversationId: CONV, + text: bigText, + }, + }), + ); + } + const peek = _peekStreamForTesting(CONV); + expect(peek).not.toBeNull(); + // 60 * ~8KB = ~480KB pushed; ring must have evicted down under 256KB. + expect(peek!.totalSizeBytes).toBeLessThanOrEqual(256 * 1024); + expect(peek!.ringLength).toBeLessThan(60); + }); + + test("evicts past the 30s age cap", async () => { + const originalNow = Date.now; + let fakeNow = 1_000_000; + Date.now = () => fakeNow; + try { + stampAndBuffer(mkEvent()); // emittedAt = 1_000_000 + fakeNow = 1_000_000 + 10_000; + stampAndBuffer(mkEvent()); // emittedAt = 1_010_000 + + // Jump 31s past the first event but keep within window of second. + fakeNow = 1_000_000 + 31_000; + stampAndBuffer(mkEvent()); // triggers eviction sweep on push + + const peek = _peekStreamForTesting(CONV); + // First event is now > 30s old → evicted. Second + third remain. + expect(peek?.ringLength).toBe(2); + expect(peek?.oldestSeq).toBe(2); + expect(peek?.newestSeq).toBe(3); + } finally { + Date.now = originalNow; + } + }); + }); + + describe("getReplayWindow", () => { + test("returns events with seq > lastSeenSeq in order", () => { + const events = Array.from({ length: 5 }, () => mkEvent()); + events.forEach(stampAndBuffer); + const replay = getReplayWindow(CONV, 2); + expect(replay).not.toBeNull(); + expect(replay!.map((e) => e.seq)).toEqual([3, 4, 5]); + }); + + test("returns empty array when lastSeenSeq is current (nothing to replay)", () => { + stampAndBuffer(mkEvent()); + stampAndBuffer(mkEvent()); + const replay = getReplayWindow(CONV, 2); + expect(replay).toEqual([]); + }); + + test("returns null when lastSeenSeq is older than oldest buffered entry", () => { + // Force eviction by pushing past the count cap. + for (let i = 0; i < 250; i++) stampAndBuffer(mkEvent()); + const peek = _peekStreamForTesting(CONV); + expect(peek?.oldestSeq).toBe(51); + // Client claims to have last seen seq=10 — that's far below oldest. + const replay = getReplayWindow(CONV, 10); + expect(replay).toBeNull(); + }); + + test("returns empty array for a conversation with no stream state", () => { + const replay = getReplayWindow("conv_never_streamed", 0); + expect(replay).toEqual([]); + }); + + test("lastSeenSeq exactly one below oldest is a valid replay (no snapshot needed)", () => { + stampAndBuffer(mkEvent()); // seq 1 + stampAndBuffer(mkEvent()); // seq 2 + stampAndBuffer(mkEvent()); // seq 3 + // Client saw nothing → lastSeenSeq=0, oldest=1, replay [1,2,3]. + const replay = getReplayWindow(CONV, 0); + expect(replay).not.toBeNull(); + expect(replay!.map((e) => e.seq)).toEqual([1, 2, 3]); + }); + }); + + describe("clearConversationStream", () => { + test("drops all state for the conversation", () => { + stampAndBuffer(mkEvent()); + stampAndBuffer(mkEvent()); + expect(_peekStreamForTesting(CONV)).not.toBeNull(); + + clearConversationStream(CONV); + + expect(_peekStreamForTesting(CONV)).toBeNull(); + // Subsequent emit starts seq fresh at 1. + const event = mkEvent(); + stampAndBuffer(event); + expect(event.seq).toBe(1); + }); + }); +}); diff --git a/assistant/src/runtime/assistant-event-hub.ts b/assistant/src/runtime/assistant-event-hub.ts index 90153d1d1b4..e0ffdc6ae63 100644 --- a/assistant/src/runtime/assistant-event-hub.ts +++ b/assistant/src/runtime/assistant-event-hub.ts @@ -43,6 +43,7 @@ import { appendEventToStream } from "../signals/event-stream.js"; import { getLogger } from "../util/logger.js"; import type { AssistantEvent } from "./assistant-event.js"; import { buildAssistantEvent } from "./assistant-event.js"; +import { stampAndBuffer } from "./conversation-stream-state.js"; const log = getLogger("assistant-event-hub"); @@ -578,6 +579,10 @@ export function broadcastMessage( ? undefined : resolvedConversationId; const event = buildAssistantEvent(msg, scopedConversationId); + // Stamp per-conversation seq and push onto the ring buffer for + // Last-Event-ID replay (B7). Mutates `event.seq` in place. No-op for + // unscoped broadcasts (no conversationId). + stampAndBuffer(event); const targetCapability = capabilityForMessageType(msg.type); // Self-echo suppression: a `sync_changed` carrying an `originClientId` // means a specific client just mutated the resource. The hub must not @@ -658,7 +663,6 @@ function resolveCanonicalRequestSourceType( return "channel"; } - /** * Lazily load heavy dependencies and create a canonical guardian request + * bridge for a confirmation_request message. Called fire-and-forget from @@ -732,4 +736,3 @@ async function createCanonicalRequestForConfirmation( ); } } - diff --git a/assistant/src/runtime/conversation-stream-state.ts b/assistant/src/runtime/conversation-stream-state.ts new file mode 100644 index 00000000000..de678ceffd7 --- /dev/null +++ b/assistant/src/runtime/conversation-stream-state.ts @@ -0,0 +1,160 @@ +/** + * Conversation Stream State -- per-conversation SSE sequence counter and + * ring buffer for `Last-Event-ID` replay (B7 Unit 1). + * + * Every conversation-scoped outbound event picks up a monotonic `seq` + * number from this module. The same event is also pushed onto a bounded + * ring buffer so a reconnecting client can request replay of events the + * daemon emitted while it was disconnected. + * + * Bounds (oldest evicted first; first bound hit wins): + * - Count: 200 events + * - Total size: 256 KB + * - Age: 30 seconds + * + * The ring is in-memory and per-daemon-process. After a daemon restart + * all seqs reset and reconnecting clients fall through to the snapshot + * path (delivered by B7 Unit 2). The ring is sized generously enough + * that a typical refresh round-trip (~1-3s) is well within window. + */ + +import type { AssistantEvent } from "./assistant-event.js"; + +// ── Tunables ───────────────────────────────────────────────────────── + +const RING_COUNT_LIMIT = 200; +const RING_SIZE_LIMIT_BYTES = 256 * 1024; +const RING_AGE_LIMIT_MS = 30_000; + +// ── Types ──────────────────────────────────────────────────────────── + +interface RingEntry { + seq: number; + event: AssistantEvent; + emittedAt: number; + sizeBytes: number; +} + +interface ConversationStreamState { + nextSeq: number; + ring: RingEntry[]; + totalSizeBytes: number; +} + +// ── State ──────────────────────────────────────────────────────────── + +const streams = new Map(); + +function getOrCreate(conversationId: string): ConversationStreamState { + let state = streams.get(conversationId); + if (!state) { + state = { nextSeq: 1, ring: [], totalSizeBytes: 0 }; + streams.set(conversationId, state); + } + return state; +} + +// ── Public API ─────────────────────────────────────────────────────── + +/** + * Assign a monotonic `seq` to a conversation-scoped event and push it + * onto the ring buffer. No-op when `event.conversationId` is absent + * (unscoped broadcasts are not replayable). + * + * Mutates `event.seq` in place so downstream `formatSseFrame` picks + * it up as the SSE wire `id:`. + */ +export function stampAndBuffer(event: AssistantEvent): void { + const cid = event.conversationId; + if (cid == null) return; + + const state = getOrCreate(cid); + event.seq = state.nextSeq++; + + // Approximate size by serialized JSON length. This is the same + // bytes-on-wire we'll send, so it tracks ring memory pressure + // closely without a separate measurement pass. + const sizeBytes = JSON.stringify(event).length; + state.ring.push({ seq: event.seq, event, emittedAt: Date.now(), sizeBytes }); + state.totalSizeBytes += sizeBytes; + + evict(state); +} + +/** + * Replay events with `seq > lastSeenSeq` for a given conversation. + * Returns `null` when the requested cursor is older than the oldest + * buffered entry -- callers should fall back to a snapshot resync. + * + * Used by B7 Unit 2's reconnect handler. + */ +export function getReplayWindow( + conversationId: string, + lastSeenSeq: number, +): readonly AssistantEvent[] | null { + const state = streams.get(conversationId); + if (!state || state.ring.length === 0) return []; + + const oldest = state.ring[0]?.seq ?? Infinity; + if (lastSeenSeq < oldest - 1) return null; + + return state.ring + .filter((entry) => entry.seq > lastSeenSeq) + .map((entry) => entry.event); +} + +/** + * Drop all state for a conversation. Currently unused -- the ring + * self-evicts by age -- but exposed for explicit dispose flows + * (e.g. when a conversation is deleted). + */ +export function clearConversationStream(conversationId: string): void { + streams.delete(conversationId); +} + +/** + * Reset all stream state. Test-only. + */ +export function _resetConversationStreamsForTesting(): void { + streams.clear(); +} + +/** + * Read-only inspector for tests. + */ +export function _peekStreamForTesting(conversationId: string): { + nextSeq: number; + ringLength: number; + totalSizeBytes: number; + oldestSeq: number | null; + newestSeq: number | null; +} | null { + const state = streams.get(conversationId); + if (!state) return null; + return { + nextSeq: state.nextSeq, + ringLength: state.ring.length, + totalSizeBytes: state.totalSizeBytes, + oldestSeq: state.ring[0]?.seq ?? null, + newestSeq: state.ring[state.ring.length - 1]?.seq ?? null, + }; +} + +// ── Internals ──────────────────────────────────────────────────────── + +function evict(state: ConversationStreamState): void { + const now = Date.now(); + while (state.ring.length > 0) { + const head = state.ring[0]; + if (head == null) break; + + const overCount = state.ring.length > RING_COUNT_LIMIT; + const overSize = state.totalSizeBytes > RING_SIZE_LIMIT_BYTES; + const overAge = now - head.emittedAt > RING_AGE_LIMIT_MS; + + if (!overCount && !overSize && !overAge) break; + + state.ring.shift(); + state.totalSizeBytes -= head.sizeBytes; + } +} diff --git a/packages/skill-host-contracts/src/assistant-event.ts b/packages/skill-host-contracts/src/assistant-event.ts index d718fefa041..db90ef3c344 100644 --- a/packages/skill-host-contracts/src/assistant-event.ts +++ b/packages/skill-host-contracts/src/assistant-event.ts @@ -30,6 +30,13 @@ export interface AssistantEvent { id: string; /** Resolved conversation id when available. */ conversationId?: string; + /** + * Monotonic per-conversation sequence number. Assigned by the daemon at + * publish time for conversation-scoped events; absent for unscoped + * broadcasts. Used as the SSE `id:` field so reconnecting clients can + * send `Last-Event-ID: ` to request replay of missed events. + */ + seq?: number; /** ISO-8601 timestamp of when the event was emitted. */ emittedAt: string; /** Outbound message payload. */ @@ -66,13 +73,19 @@ export function buildAssistantEvent( * * ``` * event: assistant_event\n - * id: \n + * id: \n * data: \n * \n * ``` + * + * When `event.seq` is set (conversation-scoped events) it is used as the SSE + * `id:` field so native EventSource clients send `Last-Event-ID: ` on + * reconnect, allowing per-conversation replay. Unscoped events fall back to + * the UUID `event.id` and are not replayable by design. */ export function formatSseFrame(event: AssistantEvent): string { - const sanitizedId = event.id.replace(/[\n\r]/g, ""); + const rawId = event.seq != null ? String(event.seq) : event.id; + const sanitizedId = rawId.replace(/[\n\r]/g, ""); const data = JSON.stringify(event); return `event: assistant_event\nid: ${sanitizedId}\ndata: ${data}\n\n`; } From 91bc016fc929f1fb3933af9a2ff43fa014ea6754 Mon Sep 17 00:00:00 2001 From: Apollo Bot Date: Sat, 30 May 2026 18:47:51 +0000 Subject: [PATCH 2/3] fix(sse): keep seq on envelope only; filter targeted events; evict on read Review feedback on #32676: 1. Revert formatSseFrame change. SSE wire id stays the per-event UUID. The replay cursor is decoupled from the SSE protocol entirely; it lives on the envelope JSON only and clients read it from the parsed event payload. B7.2 will pass it back via a request param, not Last-Event-ID. (per review: high-risk protocol.) 2. Targeted-delivery filter for the ring buffer. Events published with any of targetCapability, targetClientId, targetInterfaceId, or excludeClientId have a narrower delivery set than the conversation subscriber list. Replaying them by conversationId alone would leak host-proxy requests / self-echo suppressions to subscribers that were never meant to receive them. stampAndBuffer now takes { replayable }; seq is still stamped on every conversation-scoped event (so wire ordering stays contiguous) but the ring push is skipped when replayable is false. (per review.) 3. Read-time eviction in getReplayWindow. Eviction only ran on stampAndBuffer, so an idle stream retained its tail past the 30s age cap. getReplayWindow now sweeps age-expired entries before the cursor check and drops the conversation's state entry when the ring drains, keeping the global map from growing unboundedly with inactive conversations. (per review.) 4. Two docstring trims from the suggested-change diffs. Tests: - 4 new conversation-stream-state tests (replayable=false skips ring but stamps seq; seq monotonic across mixed replayable/non-replayable; age eviction at read on idle stream; snapshot-fallback signal still fires after read-time eviction). - formatSseFrame test repurposed: now asserts seq lands in the JSON envelope payload but NEVER in the SSE wire id. - 17/17 stream-state, 11/11 assistant-event, no regressions across 6 hub/SSE-adjacent suites. --- .../src/__tests__/assistant-event.test.ts | 19 ++--- .../conversation-stream-state.test.ts | 82 +++++++++++++++++++ assistant/src/runtime/assistant-event-hub.ts | 9 +- .../src/runtime/conversation-stream-state.ts | 43 ++++++++-- .../src/assistant-event.ts | 16 ++-- 5 files changed, 135 insertions(+), 34 deletions(-) diff --git a/assistant/src/__tests__/assistant-event.test.ts b/assistant/src/__tests__/assistant-event.test.ts index 4848e55d648..5cbb6838c25 100644 --- a/assistant/src/__tests__/assistant-event.test.ts +++ b/assistant/src/__tests__/assistant-event.test.ts @@ -112,27 +112,20 @@ describe("formatSseFrame", () => { expect(frame.endsWith("\n\n")).toBe(true); }); - test("uses event.seq as the SSE id when set (B7 Last-Event-ID)", () => { + test("seq lands in the JSON payload (envelope) when set, never in SSE id", () => { const event: AssistantEvent = { ...baseEvent, - id: "uuid-fallback", + id: "uuid-event-id", seq: 42, }; const frame = formatSseFrame(event); + // SSE wire id is always the UUID -- replay cursor is decoupled from + // the SSE protocol and lives on the envelope only. const idLine = frame.split("\n").find((l) => l.startsWith("id: "))!; - expect(idLine).toBe("id: 42"); - // UUID stays in the JSON payload for per-event uniqueness. - expect(frame).toContain('"id":"uuid-fallback"'); + expect(idLine).toBe("id: uuid-event-id"); + // seq is on the envelope; consumers read it from the JSON payload. expect(frame).toContain('"seq":42'); }); - - test("falls back to event.id when seq is absent (unscoped broadcasts)", () => { - const event: AssistantEvent = { ...baseEvent, id: "uuid-only" }; - expect(event.seq).toBeUndefined(); - const frame = formatSseFrame(event); - const idLine = frame.split("\n").find((l) => l.startsWith("id: "))!; - expect(idLine).toBe("id: uuid-only"); - }); }); // ── Heartbeat tests ─────────────────────────────────────────────────────────── diff --git a/assistant/src/__tests__/conversation-stream-state.test.ts b/assistant/src/__tests__/conversation-stream-state.test.ts index e679540c293..a2b958072de 100644 --- a/assistant/src/__tests__/conversation-stream-state.test.ts +++ b/assistant/src/__tests__/conversation-stream-state.test.ts @@ -65,6 +65,32 @@ describe("conversation-stream-state", () => { expect(peek?.oldestSeq).toBe(1); expect(peek?.newestSeq).toBe(2); }); + + test("replayable: false stamps seq but skips ring push", () => { + const targeted = mkEvent(); + stampAndBuffer(targeted, { replayable: false }); + expect(targeted.seq).toBe(1); // still stamped for wire-side ordering + const peek = _peekStreamForTesting(CONV); + // State exists (we created it to assign seq) but ring is empty. + expect(peek?.nextSeq).toBe(2); + expect(peek?.ringLength).toBe(0); + }); + + test("seq stays monotonic across mixed replayable/non-replayable events", () => { + const a = mkEvent(); + const b = mkEvent(); + const c = mkEvent(); + const d = mkEvent(); + stampAndBuffer(a); // replayable + stampAndBuffer(b, { replayable: false }); // targeted -- skipped + stampAndBuffer(c); // replayable + stampAndBuffer(d, { replayable: false }); // targeted -- skipped + expect([a.seq, b.seq, c.seq, d.seq]).toEqual([1, 2, 3, 4]); + const peek = _peekStreamForTesting(CONV); + expect(peek?.ringLength).toBe(2); // only a + c in buffer + expect(peek?.oldestSeq).toBe(1); + expect(peek?.newestSeq).toBe(3); + }); }); describe("ring buffer eviction", () => { @@ -162,6 +188,62 @@ describe("conversation-stream-state", () => { expect(replay).not.toBeNull(); expect(replay!.map((e) => e.seq)).toEqual([1, 2, 3]); }); + + test("evicts age-expired entries at read time on idle stream", () => { + const originalNow = Date.now; + let fakeNow = 5_000_000; + Date.now = () => fakeNow; + try { + stampAndBuffer(mkEvent()); // seq 1, emitted at 5_000_000 + stampAndBuffer(mkEvent()); // seq 2, emitted at 5_000_000 + + // No further stampAndBuffer calls. Stream goes idle. Advance + // clock past the 30s age cap. + fakeNow = 5_000_000 + 60_000; + + // Eviction has not run since the last write -- the buffer still + // physically holds [1, 2]. getReplayWindow must sweep first. + const replay = getReplayWindow(CONV, 0); + + // Both events were past their TTL, so eviction drains the ring + // and the call returns [] (no replay possible, no snapshot + // needed either -- client claims they saw nothing and there is + // nothing left). + expect(replay).toEqual([]); + // State entry is dropped after the drain. + expect(_peekStreamForTesting(CONV)).toBeNull(); + } finally { + Date.now = originalNow; + } + }); + + test("read-time eviction preserves the snapshot fallback signal", () => { + const originalNow = Date.now; + let fakeNow = 6_000_000; + Date.now = () => fakeNow; + try { + stampAndBuffer(mkEvent()); // seq 1 + stampAndBuffer(mkEvent()); // seq 2 + + // 40s pass. Both entries are over the age cap. + fakeNow = 6_000_000 + 40_000; + + // Now a fresh event lands -- ring contains only seq 3. + stampAndBuffer(mkEvent()); // seq 3 + // After this write, evict() already ran and dropped the stale + // entries from the write path. Verify that. + const peek = _peekStreamForTesting(CONV); + expect(peek?.ringLength).toBe(1); + expect(peek?.oldestSeq).toBe(3); + + // Client reconnects claiming lastSeenSeq=1. Oldest buffered is + // 3, so 1 < 3 - 1 = 2 -> snapshot fallback (null). + const replay = getReplayWindow(CONV, 1); + expect(replay).toBeNull(); + } finally { + Date.now = originalNow; + } + }); }); describe("clearConversationStream", () => { diff --git a/assistant/src/runtime/assistant-event-hub.ts b/assistant/src/runtime/assistant-event-hub.ts index e0ffdc6ae63..3918d76b7d6 100644 --- a/assistant/src/runtime/assistant-event-hub.ts +++ b/assistant/src/runtime/assistant-event-hub.ts @@ -579,10 +579,6 @@ export function broadcastMessage( ? undefined : resolvedConversationId; const event = buildAssistantEvent(msg, scopedConversationId); - // Stamp per-conversation seq and push onto the ring buffer for - // Last-Event-ID replay (B7). Mutates `event.seq` in place. No-op for - // unscoped broadcasts (no conversationId). - stampAndBuffer(event); const targetCapability = capabilityForMessageType(msg.type); // Self-echo suppression: a `sync_changed` carrying an `originClientId` // means a specific client just mutated the resource. The hub must not @@ -609,6 +605,11 @@ export function broadcastMessage( excludeClientId, } : undefined; + // Stamp per-conversation seq and (when replayable) push onto the ring + // buffer. Mutates `event.seq` in place. Targeted/exclusion publishes + // are stamped but not buffered -- replay by `conversationId` alone + // would leak them to subscribers outside their intended delivery set. + stampAndBuffer(event, { replayable: publishOptions == null }); _hubChain = _hubChain .then(() => assistantEventHub.publish(event, publishOptions)) .then(() => { diff --git a/assistant/src/runtime/conversation-stream-state.ts b/assistant/src/runtime/conversation-stream-state.ts index de678ceffd7..b2579ed0d79 100644 --- a/assistant/src/runtime/conversation-stream-state.ts +++ b/assistant/src/runtime/conversation-stream-state.ts @@ -57,20 +57,34 @@ function getOrCreate(conversationId: string): ConversationStreamState { // ── Public API ─────────────────────────────────────────────────────── /** - * Assign a monotonic `seq` to a conversation-scoped event and push it - * onto the ring buffer. No-op when `event.conversationId` is absent - * (unscoped broadcasts are not replayable). + * Assign a monotonic `seq` to a conversation-scoped event and (when + * replayable) push it onto the ring buffer. No-op when + * `event.conversationId` is absent (unscoped broadcasts are never + * replayable). * - * Mutates `event.seq` in place so downstream `formatSseFrame` picks - * it up as the SSE wire `id:`. + * `options.replayable` should be `false` whenever the event was + * published with any targeting / exclusion modifier + * (`targetCapability`, `targetClientId`, `targetInterfaceId`, + * `excludeClientId`). Such events have a narrower delivery set than the + * conversation subscriber list, so storing them by `conversationId` + * alone would leak them to the wrong subscribers on replay. The seq is + * still stamped so the wire-side ordering stays contiguous; only the + * ring push is skipped. Defaults to `true`. + * + * Mutates `event.seq` in place. */ -export function stampAndBuffer(event: AssistantEvent): void { +export function stampAndBuffer( + event: AssistantEvent, + options?: { replayable?: boolean }, +): void { const cid = event.conversationId; if (cid == null) return; const state = getOrCreate(cid); event.seq = state.nextSeq++; + if (options?.replayable === false) return; + // Approximate size by serialized JSON length. This is the same // bytes-on-wire we'll send, so it tracks ring memory pressure // closely without a separate measurement pass. @@ -86,14 +100,27 @@ export function stampAndBuffer(event: AssistantEvent): void { * Returns `null` when the requested cursor is older than the oldest * buffered entry -- callers should fall back to a snapshot resync. * - * Used by B7 Unit 2's reconnect handler. + * Sweeps age-expired entries at read time so an idle conversation + * cannot serve stale deltas past the 30-second window (eviction + * only runs on `stampAndBuffer`, so without this an idle stream + * would retain its tail until the next write). When the sweep + * drains the ring entirely, the conversation's state entry is + * dropped to keep the global map from growing unboundedly with + * inactive conversations. */ export function getReplayWindow( conversationId: string, lastSeenSeq: number, ): readonly AssistantEvent[] | null { const state = streams.get(conversationId); - if (!state || state.ring.length === 0) return []; + if (!state) return []; + + evict(state); + + if (state.ring.length === 0) { + streams.delete(conversationId); + return []; + } const oldest = state.ring[0]?.seq ?? Infinity; if (lastSeenSeq < oldest - 1) return null; diff --git a/packages/skill-host-contracts/src/assistant-event.ts b/packages/skill-host-contracts/src/assistant-event.ts index db90ef3c344..e8d4e6d382c 100644 --- a/packages/skill-host-contracts/src/assistant-event.ts +++ b/packages/skill-host-contracts/src/assistant-event.ts @@ -33,8 +33,8 @@ export interface AssistantEvent { /** * Monotonic per-conversation sequence number. Assigned by the daemon at * publish time for conversation-scoped events; absent for unscoped - * broadcasts. Used as the SSE `id:` field so reconnecting clients can - * send `Last-Event-ID: ` to request replay of missed events. + * broadcasts. Clients track the highest observed `seq` per conversation + * and pass it back on reconnect to request replay of missed events. */ seq?: number; /** ISO-8601 timestamp of when the event was emitted. */ @@ -73,19 +73,17 @@ export function buildAssistantEvent( * * ``` * event: assistant_event\n - * id: \n + * id: \n * data: \n * \n * ``` * - * When `event.seq` is set (conversation-scoped events) it is used as the SSE - * `id:` field so native EventSource clients send `Last-Event-ID: ` on - * reconnect, allowing per-conversation replay. Unscoped events fall back to - * the UUID `event.id` and are not replayable by design. + * The SSE `id:` line is the per-event UUID and is intentionally decoupled + * from any replay cursor. Replay-aware consumers read `seq` from the JSON + * payload of the envelope itself. */ export function formatSseFrame(event: AssistantEvent): string { - const rawId = event.seq != null ? String(event.seq) : event.id; - const sanitizedId = rawId.replace(/[\n\r]/g, ""); + const sanitizedId = event.id.replace(/[\n\r]/g, ""); const data = JSON.stringify(event); return `event: assistant_event\nid: ${sanitizedId}\ndata: ${data}\n\n`; } From 497d14d297867579d2ff344474171dbcbd136c56 Mon Sep 17 00:00:00 2001 From: Apollo Bot Date: Sat, 30 May 2026 18:56:30 +0000 Subject: [PATCH 3/3] fix(test): wrap stampAndBuffer in arrow for forEach forEach's (value, index, array) signature no longer unifies with stampAndBuffer's new (event, options?) signature -- TS rightfully complains about the index: number being passed as the options arg. Wrap in an arrow that pins to the single-arg overload. --- assistant/src/__tests__/conversation-stream-state.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/assistant/src/__tests__/conversation-stream-state.test.ts b/assistant/src/__tests__/conversation-stream-state.test.ts index a2b958072de..951047cdf3a 100644 --- a/assistant/src/__tests__/conversation-stream-state.test.ts +++ b/assistant/src/__tests__/conversation-stream-state.test.ts @@ -151,7 +151,7 @@ describe("conversation-stream-state", () => { describe("getReplayWindow", () => { test("returns events with seq > lastSeenSeq in order", () => { const events = Array.from({ length: 5 }, () => mkEvent()); - events.forEach(stampAndBuffer); + events.forEach((e) => stampAndBuffer(e)); const replay = getReplayWindow(CONV, 2); expect(replay).not.toBeNull(); expect(replay!.map((e) => e.seq)).toEqual([3, 4, 5]);