feat(sse): per-conversation seq + ring buffer for Last-Event-ID replay (B7.1)#32676
Conversation
…y (B7.1) Foundation for resumable SSE on mid-turn refresh. Every conversation-scoped outbound event picks up a monotonic `seq` from a per-conversation counter at broadcast time and is pushed onto a bounded ring buffer (200 events, 256 KB total, 30s age). Eviction is oldest-first; first bound hit wins. The SSE wire `id:` field now carries seq when present, so reconnecting clients send `Last-Event-ID: <seq>` per the EventSource spec. The UUID `event.id` stays in the JSON payload for per-event uniqueness. Unscoped broadcasts (e.g. `conversation_list_invalidated`) fall back to the UUID and are not replayable by design. This is ship unit 1 of 3. Additive only — no client-visible behavior change without B7.2 (reconnect handler) and B7.3 (client `lastSeenSeq`). `getReplayWindow(conversationId, lastSeenSeq)` is exposed for B7.2's reconnect path. Returns `null` when the cursor is older than the oldest buffered entry, signaling the caller to fall back to a snapshot resync. Tests: - 13 unit tests on conversation-stream-state (monotonic seq, per-conversation scope, no-op on unscoped, count/size/age eviction, replay window, snapshot fallback, clear). - 2 new formatSseFrame tests covering seq → SSE id wiring and UUID fallback. - Verified no regressions in assistant-event, hub, SSE-adjacent, and daemon-assistant-events test suites.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 141cca34df
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| // bytes-on-wire we'll send, so it tracks ring memory pressure | ||
| // closely without a separate measurement pass. | ||
| const sizeBytes = JSON.stringify(event).length; | ||
| state.ring.push({ seq: event.seq, event, emittedAt: Date.now(), sizeBytes }); |
There was a problem hiding this comment.
Preserve targeted delivery when buffering replays
When a conversation-scoped event is targeted (for example host_browser_request is emitted with targetClientId in assistant/src/daemon/host-browser-proxy.ts:241-245, and host-proxy types are capability-filtered in the hub), this stores only the bare AssistantEvent. getReplayWindow() can then return it by conversationId alone, with no targetClientId/capability/excludeClientId metadata for the reconnect handler to apply, so B7.2 would replay requests or self-excluded events to subscribers that were not meant to receive them. Please either skip non-broadcast messages from the replay buffer or store enough publish metadata to enforce the same delivery filters during replay.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Good catch. stampAndBuffer now takes { replayable } and only pushes onto the ring when the publish has no targeting/exclusion modifier (targetCapability, targetClientId, targetInterfaceId, excludeClientId). broadcastMessage derives this from the assembled publishOptions and passes it through. Targeted events still get a seq stamp (so wire ordering stays contiguous from the client's perspective) but they don't enter the ring -- which means B7.2's replay path will never re-deliver a host-proxy request or a self-echo-suppressed sync_changed to subscribers outside the original delivery set. Covered by two new tests in conversation-stream-state.test.ts (replayable-false skips ring; seq monotonic across mixed replayable/non-replayable). Commit 91bc016.
| const state = streams.get(conversationId); | ||
| if (!state || state.ring.length === 0) return []; |
There was a problem hiding this comment.
Evict expired replay entries before serving reads
If a conversation goes idle, evict() is never called again after the last stampAndBuffer(), so getReplayWindow() can replay events far older than the documented 30-second window and the map retains up to the size/count cap for every inactive conversation indefinitely. For example, a client reconnecting minutes later with lastSeenSeq = 0 will receive stale deltas instead of the intended snapshot fallback. Run age eviction on read (before the oldest-seq check) or otherwise schedule cleanup for idle streams.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed. getReplayWindow now runs evict(state) before the cursor check, so an idle stream cannot serve deltas older than the 30s age cap regardless of how long it's been since the last write. When the sweep drains the ring entirely, the conversation's map entry is dropped so the global streams map doesn't grow unboundedly with inactive conversations. Two new tests cover this: evicts age-expired entries at read time on idle stream (assert empty replay + state entry deleted after a 60s idle) and read-time eviction preserves the snapshot fallback signal (eviction at write time also clears stale tail, snapshot fallback still fires for cursors older than the survivor). Commit 91bc016.
| // Stamp per-conversation seq and push onto the ring buffer for | ||
| // Last-Event-ID replay (B7). Mutates `event.seq` in place. No-op for | ||
| // unscoped broadcasts (no conversationId). |
There was a problem hiding this comment.
| // Stamp per-conversation seq and push onto the ring buffer for | |
| // Last-Event-ID replay (B7). Mutates `event.seq` in place. No-op for | |
| // unscoped broadcasts (no conversationId). | |
| // Stamp per-conversation seq and push onto the ring buffer for | |
| // Last-Event-ID replay. Mutates `event.seq` in place. No-op for | |
| // unscoped broadcasts (no conversationId). |
| * Replay events with `seq > lastSeenSeq` for a given conversation. | ||
| * Returns `null` when the requested cursor is older than the oldest | ||
| * buffered entry -- callers should fall back to a snapshot resync. | ||
| * | ||
| * Used by B7 Unit 2's reconnect handler. |
There was a problem hiding this comment.
| * Replay events with `seq > lastSeenSeq` for a given conversation. | |
| * Returns `null` when the requested cursor is older than the oldest | |
| * buffered entry -- callers should fall back to a snapshot resync. | |
| * | |
| * Used by B7 Unit 2's reconnect handler. | |
| * Replay events with `seq > lastSeenSeq` for a given conversation. | |
| * Returns `null` when the requested cursor is older than the oldest | |
| * buffered entry -- callers should fall back to a snapshot resync. |
| * @param message The outbound message payload. | ||
| * @param conversationId Optional conversation id -- pass when known. | ||
| */ | ||
| export function buildAssistantEvent<TMessage>( |
There was a problem hiding this comment.
Let's add the seq to this envelope (as noted above) instead of touching SSE framing's id, which is a high risk protocol
There was a problem hiding this comment.
Done. Reverted formatSseFrame to UUID-only on the wire — the seq stays exclusively on the envelope (JSON payload). Replay-aware consumers parse event.seq from the envelope on receive. B7.2 will accept the cursor via a request param on /events (?lastSeenSeq=N), not via Last-Event-ID. Also dropped the Last-Event-ID language from the seq field doc. Commit 91bc016.
… read Review feedback on #32676: 1. Revert formatSseFrame change. SSE wire id stays the per-event UUID. The replay cursor is decoupled from the SSE protocol entirely; it lives on the envelope JSON only and clients read it from the parsed event payload. B7.2 will pass it back via a request param, not Last-Event-ID. (per review: high-risk protocol.) 2. Targeted-delivery filter for the ring buffer. Events published with any of targetCapability, targetClientId, targetInterfaceId, or excludeClientId have a narrower delivery set than the conversation subscriber list. Replaying them by conversationId alone would leak host-proxy requests / self-echo suppressions to subscribers that were never meant to receive them. stampAndBuffer now takes { replayable }; seq is still stamped on every conversation-scoped event (so wire ordering stays contiguous) but the ring push is skipped when replayable is false. (per review.) 3. Read-time eviction in getReplayWindow. Eviction only ran on stampAndBuffer, so an idle stream retained its tail past the 30s age cap. getReplayWindow now sweeps age-expired entries before the cursor check and drops the conversation's state entry when the ring drains, keeping the global map from growing unboundedly with inactive conversations. (per review.) 4. Two docstring trims from the suggested-change diffs. Tests: - 4 new conversation-stream-state tests (replayable=false skips ring but stamps seq; seq monotonic across mixed replayable/non-replayable; age eviction at read on idle stream; snapshot-fallback signal still fires after read-time eviction). - formatSseFrame test repurposed: now asserts seq lands in the JSON envelope payload but NEVER in the SSE wire id. - 17/17 stream-state, 11/11 assistant-event, no regressions across 6 hub/SSE-adjacent suites.
forEach's (value, index, array) signature no longer unifies with stampAndBuffer's new (event, options?) signature -- TS rightfully complains about the index: number being passed as the options arg. Wrap in an arrow that pins to the single-arg overload.
| // buffer. Mutates `event.seq` in place. Targeted/exclusion publishes | ||
| // are stamped but not buffered -- replay by `conversationId` alone | ||
| // would leak them to subscribers outside their intended delivery set. | ||
| stampAndBuffer(event, { replayable: publishOptions == null }); |
There was a problem hiding this comment.
we should revisit this replayable in a future PR. doesn't make sense to me that we would not buffer events to specific clients, since they will be missed on reconnect
…B7.2) (#32685) * feat(sse): reconnect handler replays buffered events on lastSeenSeq (B7.2) GET /v1/events now accepts an optional lastSeenSeq query param. When a client reconnects with a cursor scoped to a single conversation, the route handler drains the per-conversation ring buffer for events with seq > lastSeenSeq before emitting the first heartbeat. When the cursor is older than the ring's oldest entry, a single stream_resync_required event is emitted so the client can fetch a snapshot via the normal messages API and resume live from the next event. A high-water dedup watermark on the live callback drops any event that races into the subscription with seq <= the largest replayed seq -- broadcastMessage stamps and rings BEFORE publish, so in-flight events mid-replay are guaranteed to already be in the window we just drained. Adds the stream_resync_required ServerMessage variant in a new message-types/stream.ts module, wired into the ServerMessage union via message-protocol.ts. The resync event is emitted directly into the reconnecting subscriber's stream (never via broadcastMessage), carries no seq, and is never fanned out to other subscribers. 7 new tests cover: in-window replay, snapshot-resync fallback when the ring has evicted past the cursor, omitted-param legacy live-only path, dedup against a live event that duplicates a replayed seq, and three malformed-param rejections (empty / non-integer / negative). Follow-up (separate PR, noted on #32676): revisit the replayable:false behavior for targeted events so the intended recipient of a targeted publish doesn't miss it on reconnect. The right fix is to store the publish-time targeting metadata alongside each ring entry and re-apply the filter at replay time. * chore(openapi): regen for lastSeenSeq query param on /v1/events * refactor(sse): drop stream_resync_required; let client detect gap via seq jump Per review feedback: the daemon doesn't need to surface a special 'cursor too old' signal over the wire. The client already has snapshot refetch paths (messages API), and it can detect the gap purely on its own by comparing the seq of the first live event after reconnect against its persisted lastSeenSeq. Removing the message type shrinks the protocol surface and keeps the resync-from-DB policy entirely client-side where the state machine for 'should I refetch?' is more natural. Changes: - delete src/daemon/message-types/stream.ts and unwire from message-protocol.ts (no new ServerMessage variant) - in /v1/events reconnect handler, when getReplayWindow returns null (cursor older than ring's oldest), do nothing -- connection goes live as if no cursor was passed - buildAssistantEvent import on the routes file becomes unused; removed - replace the 'snapshot-resync signal' test with one that asserts the cursor-too-old path connects live without any extra frame ahead of the heartbeat - update lastSeenSeq query param description in OpenAPI Adjacent regression sweep: 82/82 SSE/hub/stream-state/framing tests still green. --------- Co-authored-by: Vellum Apollo Bot <vellum-apollo-bot[bot]@users.noreply.github.com>
What
Foundation for resumable SSE on mid-turn refresh. Ship unit 1 of 3 for B7.
seqcounter, stamped on every conversation-scoped outbound event atbroadcastMessagetime.formatSseFramenow usesseqas the SSEid:line when present so reconnecting clients sendLast-Event-ID: <seq>per the EventSource spec. UUIDevent.idstays in the JSON payload for per-event uniqueness.getReplayWindow(conversationId, lastSeenSeq)exposed for B7.2's reconnect path. Returnsnullwhen the cursor is older than the oldest buffered entry (snapshot fallback signal).Why
Even with B6 in place (mid-turn debounced persistence, #32602), refresh mid-turn still has a visible seam: DB has content through the last flush (~1s ago), but deltas emitted between the last flush and the refresh point are gone. B6 narrowed the gap from "first 60% of the message" to "~1s slice" — B7 closes the hole entirely by making the wire resumable.
This ship unit is additive only. No client-visible behavior change until B7.2 (server reconnect handler) and B7.3 (client
lastSeenSeq+Last-Event-IDsend).Tunables
Text deltas are ~20-200 bytes each, so 200 events ≈ a generous several seconds of stream. The 256 KB ceiling protects against larger tool-input payloads. 30s covers typical reconnect round-trips comfortably (page reload + SSE reconnect is usually 1-3s).
Why state on a per-conversation counter, not global
Seq is scoped to the conversation so each conversation's replay window is independent. Unscoped broadcasts (
conversation_list_invalidated) don't get a seq — they keep the UUIDevent.idfor the SSEid:line and are not replayable by design (they're system-level system invalidations, not stream data).Files
packages/skill-host-contracts/src/assistant-event.ts— addsseq?: numbertoAssistantEvent;formatSseFrameprefersseqoveridfor the SSE id line.assistant/src/runtime/conversation-stream-state.ts(new) — per-conversation counter + ring buffer module. ExportsstampAndBuffer,getReplayWindow,clearConversationStream, and test-only inspectors.assistant/src/runtime/assistant-event-hub.ts—broadcastMessagecallsstampAndBuffer(event)afterbuildAssistantEvent, before publish.assistant/src/__tests__/conversation-stream-state.test.ts(new) — 13 unit tests.assistant/src/__tests__/assistant-event.test.ts— 2 newformatSseFrametests for seq → id wiring + UUID fallback.Tests
conversation-stream-state:Date.nowshim)getReplayWindowreturns events withseq > lastSeenSeqnullreplay when cursor is older than oldest (snapshot fallback)lastSeenSeq = 0returns full bufferclearConversationStreamdrops state cleanlyformatSseFrametests covering seq → SSE id wiring and UUID fallback for unscoped events.assistant-event-hub,assistant-event-hub-*(4 files),assistant-events-sse-*(2 files),runtime-events-sse*(3 files),daemon-assistant-events,inference-profile-session-handler,disk-pressure-*(3 files).Next (not in this PR)
Last-Event-IDheader inhandleSubscribeAssistantEvents, callgetReplayWindow, emitstream_resync_requiredsnapshot event when the cursor is too old.lastSeenSeqper conversation in localStorage, sendLast-Event-IDon reconnect, handlestream_resync_requiredby resetting row content.Spec:
/scratch/b7-sse-replay-spec.md(workspace-local).