diff --git a/assistant/src/__tests__/tool-preview-lifecycle.test.ts b/assistant/src/__tests__/tool-preview-lifecycle.test.ts index 514bba9cb73..6c73715403d 100644 --- a/assistant/src/__tests__/tool-preview-lifecycle.test.ts +++ b/assistant/src/__tests__/tool-preview-lifecycle.test.ts @@ -314,16 +314,30 @@ describe("tool preview lifecycle", () => { input: { path: "/test" }, }); - // Verify ordering - const eventTypes = collector.events.map((e) => e.type); + // Verify ordering of the legacy tool events. The streaming + // architecture (PR 1 of the streaming-message-architecture plan) + // interleaves additive `message_open` / `block_open` events here, + // so filter to the legacy tool events the test originally covered. + const STREAMING_LIFECYCLE_TYPES = new Set([ + "message_open", + "block_open", + "block_close", + "message_close", + ]); + const eventTypes = collector.events + .map((e) => e.type) + .filter((t) => !STREAMING_LIFECYCLE_TYPES.has(t)); expect(eventTypes).toEqual([ "tool_use_preview_start", "tool_input_delta", "tool_use_start", ]); - // Verify all events carry the same toolUseId + // Verify all tool-scoped events carry the same toolUseId. The new + // streaming lifecycle events are not tool-scoped (they identify + // messages and blocks instead) so they are excluded from the check. for (const event of collector.events) { + if (STREAMING_LIFECYCLE_TYPES.has(event.type)) continue; expect((event as any).toolUseId).toBe(toolUseId); } }); @@ -397,7 +411,18 @@ describe("tool preview lifecycle", () => { isError: false, }); - const eventTypes = collector.events.map((e) => e.type); + // Filter out the additive streaming lifecycle events emitted by the + // streaming-message-architecture plan (PR 1) so the legacy ordering + // assertion still holds. + const STREAMING_LIFECYCLE_TYPES = new Set([ + "message_open", + "block_open", + "block_close", + "message_close", + ]); + const eventTypes = collector.events + .map((e) => e.type) + .filter((t) => !STREAMING_LIFECYCLE_TYPES.has(t)); expect(eventTypes).toEqual([ "tool_use_preview_start", "tool_input_delta", @@ -405,8 +430,11 @@ describe("tool preview lifecycle", () => { "tool_result", ]); - // Verify toolUseId consistency across all events + // Verify toolUseId consistency across tool-scoped events. The new + // streaming lifecycle events identify messages / blocks instead and + // are excluded. for (const event of collector.events) { + if (STREAMING_LIFECYCLE_TYPES.has(event.type)) continue; expect((event as any).toolUseId).toBe(toolUseId); } diff --git a/assistant/src/api/events/assistant-text-delta.ts b/assistant/src/api/events/assistant-text-delta.ts index d4da810badd..0c15f8e0018 100644 --- a/assistant/src/api/events/assistant-text-delta.ts +++ b/assistant/src/api/events/assistant-text-delta.ts @@ -6,11 +6,19 @@ * message; the matching `message_complete` event marks the turn done. * * `messageId` is the database row id of the assistant message this - * delta belongs to — stamped from the pre-allocated turn anchor (see - * `reserveMessage` / `AssistantTurnStartEvent`). Absent on streams - * produced by older daemons that pre-date the anchor protocol, or on - * synthetic deltas (canned greetings, slash-command echoes, live-voice - * transcript injections) that don't bind to a row. + * delta belongs to. The main agent loop (post PR 1 of the + * streaming-message-architecture plan) always allocates and emits a + * `messageId` for every delta it produces, via `ensureMessageOpen` in + * `conversation-agent-loop-handlers.ts`. The field stays optional in + * this schema because synthetic emitters that don't bind to a persisted + * row (canned greetings, slash-command echoes, live-voice transcript + * injections, wake-target replays, recording handler echoes) still emit + * deltas without one; those streams are consumed by channel adapters, + * not by the `MessageStreamReducer` path. + * + * `blockIndex` and `seq` are populated whenever `messageId` is, so a + * client receiving any of the three is guaranteed to receive all three + * (idempotent reducer keying invariant — see `MessageStreamReducer`). * * Canonical wire-contract source. Daemon code imports the type directly * from this file; external consumers import via `@vellumai/assistant-api`. @@ -23,6 +31,14 @@ export const AssistantTextDeltaEventSchema = z type: z.literal("assistant_text_delta"), text: z.string(), messageId: z.string().optional(), + /** 0-based content-block index within the parent `messageId`. Optional + * for backwards compatibility with synthetic deltas that don't bind + * to a block. */ + blockIndex: z.number().optional(), + /** Monotonically increasing per-conversation sequence number for + * idempotent client replay. Optional during the streaming-architecture + * rollout — daemons that pre-date PR 1 of the plan omit it. */ + seq: z.number().optional(), conversationId: z.string().optional(), }) .strict(); diff --git a/assistant/src/api/events/block-close.ts b/assistant/src/api/events/block-close.ts new file mode 100644 index 00000000000..26361d326b1 --- /dev/null +++ b/assistant/src/api/events/block-close.ts @@ -0,0 +1,27 @@ +/** + * `block_close` SSE event. + * + * Emitted when a content block within an assistant message ends — the + * peer of `block_open`. Text blocks close when the next non-text content + * starts (or when the turn ends); tool_use blocks close when their + * matching `tool_result` arrives. Clients should treat `(messageId, + * blockIndex)` as the block identity for idempotent application. + * + * Canonical wire-contract source. Daemon code imports the type directly + * from this file; external consumers import via `@vellumai/assistant-api`. + */ + +import { z } from "zod"; + +export const BlockCloseEventSchema = z + .object({ + type: z.literal("block_close"), + messageId: z.string(), + blockIndex: z.number(), + /** Monotonically increasing per-conversation sequence number. */ + seq: z.number(), + conversationId: z.string().optional(), + }) + .strict(); + +export type BlockCloseEvent = z.infer; diff --git a/assistant/src/api/events/block-open.ts b/assistant/src/api/events/block-open.ts new file mode 100644 index 00000000000..5a6feac435d --- /dev/null +++ b/assistant/src/api/events/block-open.ts @@ -0,0 +1,41 @@ +/** + * `block_open` SSE event. + * + * Emitted when a new content block within an assistant message starts — + * paired with `block_close` at the block's end. Carries the message and + * block coordinates that every block-scoped event (`assistant_text_delta`, + * `tool_use_start`, `tool_input_delta`, `tool_result`) stamps in this turn. + * + * Block kinds today: + * - `text` — a streamed text block opened on the first text delta + * emitted after the previous block closed. + * - `tool_use` — a tool invocation; opened immediately before the + * matching `tool_use_start` event and closed when the + * corresponding `tool_result` arrives. + * + * `blockIndex` is 0-based and monotonically increases within a single + * message; it never repeats across blocks in the same `messageId`. + * + * Canonical wire-contract source. Daemon code imports the type directly + * from this file; external consumers import via `@vellumai/assistant-api`. + */ + +import { z } from "zod"; + +export const BlockOpenEventSchema = z + .object({ + type: z.literal("block_open"), + messageId: z.string(), + blockIndex: z.number(), + blockType: z.enum(["text", "tool_use"]), + /** Tool name when `blockType` is `tool_use`; omitted otherwise. */ + toolName: z.string().optional(), + /** Tool-use id when `blockType` is `tool_use`; omitted otherwise. */ + toolUseId: z.string().optional(), + /** Monotonically increasing per-conversation sequence number. */ + seq: z.number(), + conversationId: z.string().optional(), + }) + .strict(); + +export type BlockOpenEvent = z.infer; diff --git a/assistant/src/api/events/message-close.ts b/assistant/src/api/events/message-close.ts new file mode 100644 index 00000000000..de9b5d82025 --- /dev/null +++ b/assistant/src/api/events/message-close.ts @@ -0,0 +1,25 @@ +/** + * `message_close` SSE event. + * + * Emitted at the end of an assistant turn — the peer of `message_open`, + * carrying the same `messageId`. Marks the turn done in the new + * streaming architecture; the legacy `message_complete` event continues + * to fire alongside it during the rollout for backward compatibility. + * + * Canonical wire-contract source. Daemon code imports the type directly + * from this file; external consumers import via `@vellumai/assistant-api`. + */ + +import { z } from "zod"; + +export const MessageCloseEventSchema = z + .object({ + type: z.literal("message_close"), + messageId: z.string(), + /** Monotonically increasing per-conversation sequence number. */ + seq: z.number(), + conversationId: z.string().optional(), + }) + .strict(); + +export type MessageCloseEvent = z.infer; diff --git a/assistant/src/api/events/message-complete.ts b/assistant/src/api/events/message-complete.ts index db91b95d3dc..60cb751edb4 100644 --- a/assistant/src/api/events/message-complete.ts +++ b/assistant/src/api/events/message-complete.ts @@ -18,6 +18,17 @@ * side effects on `source !== "aux"`. Absent is treated as `"main"` * for backwards compatibility. * + * Streaming-architecture status (post-PR 6): the new addressable-event + * protocol uses `message_open` / `message_close` as the canonical + * lifecycle pair for the `MessageStreamReducer` path. `message_complete` + * remains the canonical end-of-turn signal for the wider event-consumer + * fleet — CLI, voice session bridge, channel retry sweep, background + * dispatch — which still gate on it for side effects (task-complete + * sound, attachment delivery, channel idle bookkeeping). It is the + * post-persistence event with the authoritative DB row id, attachments, + * and `source` discriminator; `message_close` is the pre-persistence + * streaming signal and does not carry those fields. + * * Canonical wire-contract source. Daemon code imports the type directly * from this file; external consumers import via `@vellumai/assistant-api`. */ diff --git a/assistant/src/api/events/message-open.ts b/assistant/src/api/events/message-open.ts new file mode 100644 index 00000000000..62a9945bc0d --- /dev/null +++ b/assistant/src/api/events/message-open.ts @@ -0,0 +1,32 @@ +/** + * `message_open` SSE event. + * + * Emitted by the daemon on the first content emission of an assistant + * turn — before the first `assistant_text_delta` or `tool_use_start` — + * to declare a stable `messageId` (UUIDv7) for the message that the rest + * of the turn's events will stamp on their `messageId` field. Paired with + * `message_close` at end-of-turn. Clients should anchor a bubble at + * `message_open` instead of inferring identity from the first delta. + * + * Additive alongside the legacy `assistant_text_delta` + `message_complete` + * pair during the streaming-architecture rollout; new clients prefer the + * `message_open` / `block_open` / `block_close` / `message_close` shape. + * + * Canonical wire-contract source. Daemon code imports the type directly + * from this file; external consumers import via `@vellumai/assistant-api`. + */ + +import { z } from "zod"; + +export const MessageOpenEventSchema = z + .object({ + type: z.literal("message_open"), + messageId: z.string(), + role: z.enum(["assistant"]), + /** Monotonically increasing per-conversation sequence number. */ + seq: z.number(), + conversationId: z.string().optional(), + }) + .strict(); + +export type MessageOpenEvent = z.infer; diff --git a/assistant/src/api/events/tool-use-start.ts b/assistant/src/api/events/tool-use-start.ts index b76f7d47409..49c6f6c67eb 100644 --- a/assistant/src/api/events/tool-use-start.ts +++ b/assistant/src/api/events/tool-use-start.ts @@ -25,6 +25,11 @@ export const ToolUseStartEventSchema = z input: z.record(z.string(), z.unknown()), toolUseId: z.string().optional(), messageId: z.string().optional(), + /** 0-based content-block index within the parent `messageId`. */ + blockIndex: z.number().optional(), + /** Monotonically increasing per-conversation sequence number for + * idempotent client replay. */ + seq: z.number().optional(), conversationId: z.string().optional(), }) .strict(); diff --git a/assistant/src/api/index.ts b/assistant/src/api/index.ts index 619e59ada79..50f16ac5fba 100644 --- a/assistant/src/api/index.ts +++ b/assistant/src/api/index.ts @@ -2,13 +2,17 @@ import { z } from "zod"; import { AssistantTextDeltaEventSchema } from "./events/assistant-text-delta.js"; import { AssistantTurnStartEventSchema } from "./events/assistant-turn-start.js"; +import { BlockCloseEventSchema } from "./events/block-close.js"; +import { BlockOpenEventSchema } from "./events/block-open.js"; import { DocumentCommentCreatedEventSchema } from "./events/document-comment-created.js"; import { DocumentCommentDeletedEventSchema } from "./events/document-comment-deleted.js"; import { DocumentCommentReopenedEventSchema } from "./events/document-comment-reopened.js"; import { DocumentCommentResolvedEventSchema } from "./events/document-comment-resolved.js"; import { GenerationCancelledEventSchema } from "./events/generation-cancelled.js"; import { GenerationHandoffEventSchema } from "./events/generation-handoff.js"; +import { MessageCloseEventSchema } from "./events/message-close.js"; import { MessageCompleteEventSchema } from "./events/message-complete.js"; +import { MessageOpenEventSchema } from "./events/message-open.js"; import { OpenUrlEventSchema } from "./events/open-url.js"; import { RelationshipStateUpdatedEventSchema } from "./events/relationship-state-updated.js"; import { ToolUseStartEventSchema } from "./events/tool-use-start.js"; @@ -26,6 +30,14 @@ export { type AssistantTurnStartEvent, AssistantTurnStartEventSchema, } from "./events/assistant-turn-start.js"; +export { + type BlockCloseEvent, + BlockCloseEventSchema, +} from "./events/block-close.js"; +export { + type BlockOpenEvent, + BlockOpenEventSchema, +} from "./events/block-open.js"; export { type DocumentCommentCreatedEvent, DocumentCommentCreatedEventSchema, @@ -50,10 +62,18 @@ export { type GenerationHandoffEvent, GenerationHandoffEventSchema, } from "./events/generation-handoff.js"; +export { + type MessageCloseEvent, + MessageCloseEventSchema, +} from "./events/message-close.js"; export { type MessageCompleteEvent, MessageCompleteEventSchema, } from "./events/message-complete.js"; +export { + type MessageOpenEvent, + MessageOpenEventSchema, +} from "./events/message-open.js"; export { type OpenUrlEvent, OpenUrlEventSchema } from "./events/open-url.js"; export { type RelationshipStateUpdatedEvent, @@ -108,13 +128,17 @@ export { export const AssistantEventSchema = z.discriminatedUnion("type", [ AssistantTextDeltaEventSchema, AssistantTurnStartEventSchema, + BlockCloseEventSchema, + BlockOpenEventSchema, DocumentCommentCreatedEventSchema, DocumentCommentDeletedEventSchema, DocumentCommentReopenedEventSchema, DocumentCommentResolvedEventSchema, GenerationCancelledEventSchema, GenerationHandoffEventSchema, + MessageCloseEventSchema, MessageCompleteEventSchema, + MessageOpenEventSchema, OpenUrlEventSchema, RelationshipStateUpdatedEventSchema, ToolUseStartEventSchema, diff --git a/assistant/src/daemon/conversation-agent-loop-handlers.ts b/assistant/src/daemon/conversation-agent-loop-handlers.ts index 9fcaf217d9c..2270d3ae19c 100644 --- a/assistant/src/daemon/conversation-agent-loop-handlers.ts +++ b/assistant/src/daemon/conversation-agent-loop-handlers.ts @@ -7,7 +7,7 @@ */ import type pino from "pino"; -import { v4 as uuid } from "uuid"; +import { v4 as uuid, v7 as uuidv7 } from "uuid"; import type { AgentEvent } from "../agent/loop.js"; import type { @@ -71,6 +71,7 @@ import { } from "./conversation-error.js"; import { isProviderOrderingError } from "./conversation-slash.js"; import { resolveTurnTimezoneContext } from "./date-context.js"; +import { appendEvent } from "./event-log.js"; import type { CardSurfaceData, ServerMessage, @@ -81,6 +82,7 @@ import type { WebSearchMetadata, WebSearchResultItem, } from "./message-types/web-activity.js"; +import { nextSeq } from "./streaming-events.js"; const log = getLogger("agent-loop-handlers"); @@ -218,6 +220,27 @@ export interface EventHandlerState { readonly serverToolStartedAt: Map; /** Original input from server_tool_start, keyed by tool_use_id, so the complete handler can read the query. */ readonly serverToolInputs: Map>; + // ── Streaming architecture (PR 1) — pre-assigned message_id, block_index, seq + /** + * UUIDv7 pre-assigned on the first content emission of the turn (text + * delta or tool_use). The same id is reused as the row primary key when + * `handleMessageComplete` persists the assistant message, so every + * streaming event for the turn — and the final persisted row — share + * one stable identifier. + */ + currentMessageId: string | undefined; + /** + * 0-based content-block counter for the turn. Bumped each time a new + * block opens (text → tool_use, tool_use → text, tool_use → tool_use). + * `undefined` until the first block opens. + */ + currentBlockIndex: number | undefined; + /** Kind of the currently-open block, or `null` between blocks. */ + currentBlockType: "text" | "tool_use" | null; + /** Maps `toolUseId → blockIndex` so `tool_input_delta` and `tool_result` + * events can be stamped with the same block coordinate as the matching + * `tool_use_start` / `block_open`. */ + readonly toolUseIdToBlockIndex: Map; } /** Immutable context shared across event handlers within a single agent loop run. */ @@ -277,6 +300,10 @@ export function createEventHandlerState(): EventHandlerState { turnStartedAt: Date.now(), serverToolStartedAt: new Map(), serverToolInputs: new Map(), + currentMessageId: undefined, + currentBlockIndex: undefined, + currentBlockType: null, + toolUseIdToBlockIndex: new Map(), }; } @@ -403,6 +430,137 @@ function resolveAssistantReplyTimestampTimezone( }).effectiveTimezone; } +// ── Streaming architecture helpers (PR 1) ──────────────────────────── +// +// These keep the new addressable-event protocol additive alongside the +// legacy `assistant_text_delta` + `message_complete` shape. The agent +// loop pre-assigns a UUIDv7 the first time the turn produces content, +// then re-uses it when the row finally persists in +// `handleMessageComplete`. Every streaming event for the turn carries +// `(messageId, blockIndex, seq)` so reconnecting clients can dedupe +// and order events deterministically. + +/** + * Emit a streaming event over the live SSE channel and append it to the + * durable event log so `Last-Event-Id` reconnect replay can resume from + * any point in the turn. Caller is responsible for stamping `seq` + * (typically via {@link nextSeq}) before invocation — `appendEvent` + * filters out events without a `seq` and is a no-op for non-streaming + * event types. + * + * The publish-then-log order matches the priority of the two paths: + * the live consumer sees the event as soon as possible, and a DB + * failure during the durable write is logged inside `appendEvent` but + * never tears down the in-flight turn. + */ +function emitWithLog(deps: EventHandlerDeps, event: ServerMessage): void { + deps.onEvent(event); + appendEvent(deps.ctx.conversationId, event); +} + +/** + * Ensure the turn's assistant message id is allocated and `message_open` + * has been emitted. Called from the first text-delta and tool-use entry + * points. Idempotent — subsequent calls in the same turn are no-ops. + */ +function ensureMessageOpen( + state: EventHandlerState, + deps: EventHandlerDeps, +): string { + if (state.currentMessageId) return state.currentMessageId; + const messageId = uuidv7(); + state.currentMessageId = messageId; + emitWithLog(deps, { + type: "message_open", + conversationId: deps.ctx.conversationId, + messageId, + role: "assistant", + seq: nextSeq(deps.ctx.conversationId), + }); + return messageId; +} + +/** + * Close the currently-open block, if any. Emits a `block_close` event + * stamped with the open block's index. The caller is expected to + * advance to a new block (via {@link openBlock}) when applicable. + */ +function closeCurrentBlock( + state: EventHandlerState, + deps: EventHandlerDeps, +): void { + if (state.currentBlockType === null || state.currentBlockIndex === undefined) + return; + const messageId = state.currentMessageId; + if (!messageId) return; + emitWithLog(deps, { + type: "block_close", + conversationId: deps.ctx.conversationId, + messageId, + blockIndex: state.currentBlockIndex, + seq: nextSeq(deps.ctx.conversationId), + }); + state.currentBlockType = null; +} + +/** + * Open a new content block (`text` or `tool_use`). Allocates the next + * `blockIndex`, updates the state, and emits `block_open`. Every caller + * runs {@link ensureMessageOpen} first so `state.currentMessageId` is + * always set. + */ +function openBlock( + state: EventHandlerState, + deps: EventHandlerDeps, + blockType: "text" | "tool_use", + toolInfo?: { toolName: string; toolUseId: string }, +): number { + const messageId = state.currentMessageId!; + const nextIndex = + state.currentBlockIndex === undefined ? 0 : state.currentBlockIndex + 1; + state.currentBlockIndex = nextIndex; + state.currentBlockType = blockType; + if (blockType === "tool_use" && toolInfo) { + state.toolUseIdToBlockIndex.set(toolInfo.toolUseId, nextIndex); + } + emitWithLog(deps, { + type: "block_open", + conversationId: deps.ctx.conversationId, + messageId, + blockIndex: nextIndex, + blockType, + ...(toolInfo + ? { toolName: toolInfo.toolName, toolUseId: toolInfo.toolUseId } + : {}), + seq: nextSeq(deps.ctx.conversationId), + }); + return nextIndex; +} + +/** + * Ensure the currently-open block matches the requested kind, opening a + * new block (and closing the prior one) when the kind changes or when + * no block is open yet. Returns the resolved `blockIndex`. + * + * `tool_use` blocks are always opened anew so each tool invocation gets + * its own block index, even when consecutive tool calls fire without an + * intervening text block. + */ +function ensureBlockForKind( + state: EventHandlerState, + deps: EventHandlerDeps, + kind: "text" | "tool_use", + toolInfo?: { toolName: string; toolUseId: string }, +): number { + if (kind === "text" && state.currentBlockType === "text") { + return state.currentBlockIndex!; + } + if (state.currentBlockType !== null) { + closeCurrentBlock(state, deps); + } + return openBlock(state, deps, kind, toolInfo); +} + // ── Individual Handlers ────────────────────────────────────────────── function handleTextDelta( @@ -427,10 +585,15 @@ function handleTextDelta( "Thinking", ); } - deps.onEvent({ + const messageId = ensureMessageOpen(state, deps); + const blockIndex = ensureBlockForKind(state, deps, "text"); + emitWithLog(deps, { type: "assistant_text_delta", text: drained.emitText, conversationId: deps.ctx.conversationId, + messageId, + blockIndex, + seq: nextSeq(deps.ctx.conversationId), }); if (deps.shouldGenerateTitle) state.firstAssistantText += drained.emitText; } @@ -492,12 +655,20 @@ export function handleToolUse( deps.reqId, statusText, ); - deps.onEvent({ + const messageId = ensureMessageOpen(state, deps); + const blockIndex = ensureBlockForKind(state, deps, "tool_use", { + toolName: event.name, + toolUseId: event.id, + }); + emitWithLog(deps, { type: "tool_use_start", toolName: event.name, input: event.input, conversationId: deps.ctx.conversationId, toolUseId: event.id, + messageId, + blockIndex, + seq: nextSeq(deps.ctx.conversationId), }); } @@ -598,7 +769,7 @@ function handleToolOutputChunk( } export function handleInputJsonDelta( - _state: EventHandlerState, + state: EventHandlerState, deps: EventHandlerDeps, event: Extract, ): void { @@ -606,12 +777,16 @@ export function handleInputJsonDelta( // stream for app_create code previews. Non-app tools would send large // cumulative JSON on every delta with no benefit. if (!APP_TOOL_NAMES.has(event.toolName)) return; - deps.onEvent({ + const blockIndex = state.toolUseIdToBlockIndex.get(event.toolUseId); + emitWithLog(deps, { type: "tool_input_delta", toolName: event.toolName, content: event.accumulatedJson, conversationId: deps.ctx.conversationId, toolUseId: event.toolUseId, + ...(state.currentMessageId ? { messageId: state.currentMessageId } : {}), + ...(blockIndex !== undefined ? { blockIndex } : {}), + seq: nextSeq(deps.ctx.conversationId), }); } @@ -730,7 +905,8 @@ export function handleToolResult( } // Send to client last so state is consistent even if onEvent throws. - deps.onEvent({ + const toolBlockIndex = state.toolUseIdToBlockIndex.get(event.toolUseId); + emitWithLog(deps, { type: "tool_result", toolName: "", result: event.content, @@ -752,7 +928,22 @@ export function handleToolResult( approvalReason: event.approvalReason, riskThreshold: event.riskThreshold, activityMetadata: event.activityMetadata, + ...(state.currentMessageId ? { messageId: state.currentMessageId } : {}), + ...(toolBlockIndex !== undefined ? { blockIndex: toolBlockIndex } : {}), + seq: nextSeq(deps.ctx.conversationId), }); + + // Close the tool_use block now that its result has been delivered. We + // can only safely emit `block_close` when the current block is still + // the matching tool_use — if the model has already started emitting + // text (which would have opened a new block in `handleTextDelta`) the + // earlier block close fired there. + if ( + state.currentBlockType === "tool_use" && + state.currentBlockIndex === toolBlockIndex + ) { + closeCurrentBlock(state, deps); + } } /** @@ -990,10 +1181,15 @@ export async function handleMessageComplete( // Flush any remaining directive display buffer if (state.pendingDirectiveDisplayBuffer.length > 0) { - deps.onEvent({ + const flushMessageId = ensureMessageOpen(state, deps); + const flushBlockIndex = ensureBlockForKind(state, deps, "text"); + emitWithLog(deps, { type: "assistant_text_delta", text: state.pendingDirectiveDisplayBuffer, conversationId: deps.ctx.conversationId, + messageId: flushMessageId, + blockIndex: flushBlockIndex, + seq: nextSeq(deps.ctx.conversationId), }); if (deps.shouldGenerateTitle) state.firstAssistantText += state.pendingDirectiveDisplayBuffer; @@ -1161,6 +1357,14 @@ export async function handleMessageComplete( // pipeline. No `syncToDisk` here — the orchestrator separately invokes // `syncMessageToDisk` on `state.lastAssistantMessageId` after the loop // completes (see `conversation-agent-loop.ts::syncLastAssistantMessageToDisk`). + // + // When the agent loop opened a streaming message earlier this turn + // (`state.currentMessageId` was assigned in `ensureMessageOpen`), reuse + // that pre-allocated id as the row primary key so every event the daemon + // already streamed — text deltas, tool_use/tool_result, the new + // message_open / block_open / block_close events — references the same + // id that downstream consumers (message_complete, history reload, + // memory backfills) see. const assistantPersistResult = (await runPipeline( "persistence", getMiddlewaresFor("persistence"), @@ -1171,6 +1375,9 @@ export async function handleMessageComplete( role: "assistant", content: JSON.stringify(contentForPersistence), metadata: assistantChannelMetadata, + ...(state.currentMessageId + ? { addOptions: { messageId: state.currentMessageId } } + : {}), }, buildHandlerTurnContext(deps), DEFAULT_TIMEOUTS.persistence, @@ -1178,6 +1385,37 @@ export async function handleMessageComplete( const assistantMsg = assistantPersistResult.message; state.lastAssistantMessageId = assistantMsg.id; + // Streaming architecture: close any block still open, then emit the + // turn's `message_close`. Synthesizes a `message_open` first when the + // turn produced no content events (canned messages, aux notifier + // injections, slash commands), so clients that only consume the new + // protocol still see a complete `message_open` → `message_close` pair + // for every persisted assistant row. + if (!state.currentMessageId) { + state.currentMessageId = assistantMsg.id; + emitWithLog(deps, { + type: "message_open", + conversationId: deps.ctx.conversationId, + messageId: assistantMsg.id, + role: "assistant", + seq: nextSeq(deps.ctx.conversationId), + }); + } + if (state.currentBlockType !== null) { + closeCurrentBlock(state, deps); + } + emitWithLog(deps, { + type: "message_close", + conversationId: deps.ctx.conversationId, + messageId: state.currentMessageId, + seq: nextSeq(deps.ctx.conversationId), + }); + // Reset per-message streaming state so the next turn starts fresh. + state.currentMessageId = undefined; + state.currentBlockIndex = undefined; + state.currentBlockType = null; + state.toolUseIdToBlockIndex.clear(); + // Backfill message_id on all LLM request logs from this turn. // The agent loop is single-threaded per conversation, so all rows with // message_id IS NULL belong to the current turn. @@ -1436,12 +1674,20 @@ export async function dispatchAgentEvent( ); state.serverToolStartedAt.set(event.toolUseId, Date.now()); state.serverToolInputs.set(event.toolUseId, event.input); - deps.onEvent({ + const messageId = ensureMessageOpen(state, deps); + const blockIndex = ensureBlockForKind(state, deps, "tool_use", { + toolName: event.name, + toolUseId: event.toolUseId, + }); + emitWithLog(deps, { type: "tool_use_start", toolName: event.name, input: event.input, conversationId: deps.ctx.conversationId, toolUseId: event.toolUseId, + messageId, + blockIndex, + seq: nextSeq(deps.ctx.conversationId), }); break; } @@ -1514,7 +1760,10 @@ export async function dispatchAgentEvent( .map((r) => `${r.title}\n${r.url}`) .join("\n\n"); - deps.onEvent({ + const serverToolBlockIndex = state.toolUseIdToBlockIndex.get( + event.toolUseId, + ); + emitWithLog(deps, { type: "tool_result", toolName: "web_search", result: resultText, @@ -1522,7 +1771,20 @@ export async function dispatchAgentEvent( conversationId: deps.ctx.conversationId, toolUseId: event.toolUseId, ...(metadata ? { activityMetadata: { webSearch: metadata } } : {}), + ...(state.currentMessageId + ? { messageId: state.currentMessageId } + : {}), + ...(serverToolBlockIndex !== undefined + ? { blockIndex: serverToolBlockIndex } + : {}), + seq: nextSeq(deps.ctx.conversationId), }); + if ( + state.currentBlockType === "tool_use" && + state.currentBlockIndex === serverToolBlockIndex + ) { + closeCurrentBlock(state, deps); + } break; } case "error": diff --git a/assistant/src/daemon/conversation-store.ts b/assistant/src/daemon/conversation-store.ts index 54011117e88..2243aaa3607 100644 --- a/assistant/src/daemon/conversation-store.ts +++ b/assistant/src/daemon/conversation-store.ts @@ -28,6 +28,7 @@ import { getSandboxWorkingDir } from "../util/platform.js"; import { Conversation } from "./conversation.js"; import type { ConversationEvictor } from "./conversation-evictor.js"; import type { ConversationCreateOptions } from "./handlers/shared.js"; +import { resetSeq } from "./streaming-events.js"; import { buildTransportHints } from "./transport-hints.js"; // ── Private store ────────────────────────────────────────────────── @@ -336,6 +337,7 @@ export function destroyActiveConversation(conversationId: string): void { conversation.dispose(); deleteConversation(conversationId); deleteConversationOptions(conversationId); + resetSeq(conversationId); } /** @@ -348,6 +350,7 @@ export function clearAllActiveConversations(): number { for (const id of conversationIds()) { removeFromEvictor(id); subagentManager.abortAllForParent(id); + resetSeq(id); } for (const conversation of allConversations()) { conversation.dispose(); diff --git a/assistant/src/daemon/event-log.ts b/assistant/src/daemon/event-log.ts new file mode 100644 index 00000000000..a8b418ede2c --- /dev/null +++ b/assistant/src/daemon/event-log.ts @@ -0,0 +1,215 @@ +/** + * Durable per-conversation streaming-event log. + * + * Every streaming event emitted during an assistant turn is appended to the + * `conversation_events` table alongside the live SSE publish. The table + * backs the SSE `Last-Event-Id` reconnect protocol: when a client drops a + * stream mid-turn, it re-subscribes with the last `seq` it observed and + * the daemon replays every persisted row with `seq > last_event_id` + * before fanning live events into the new connection. + * + * Storage is intentionally append-only; rows are pruned by + * {@link trimEventsOlderThan} (called from the daemon's startup cleanup + * hook) rather than per-conversation truncation, so a reconnecting client + * can always resume from the last `seq` it observed even when the + * conversation has since been evicted from in-memory state. + * + * The neutral `AssistantEvent` envelope is what subscribers receive on the + * wire, so we persist the full envelope JSON to keep replay verbatim. The + * `event_type`, `message_id`, and `block_index` columns are denormalized + * from the payload to support targeted queries (debugging, future + * per-message replay) without re-parsing every row. + */ + +import { rawAll, rawRun } from "../memory/raw-query.js"; +import { getLogger } from "../util/logger.js"; +import type { ServerMessage } from "./message-protocol.js"; + +const log = getLogger("event-log"); + +/** + * Event types persisted to the durable log. Restricting writes to the + * streaming-architecture event set keeps the log small and avoids racing + * with bespoke broadcasts (sync invalidations, host-proxy traffic) that + * are not part of the replay protocol. The set mirrors the events PR 1 + * stamped with `seq` — `message_complete` and `generation_handoff` are + * intentionally absent because they still emit without `seq` from the + * orchestrator and replay would deliver them out of order relative to + * the addressable events around them. + */ +const LOGGED_EVENT_TYPES = new Set([ + "message_open", + "block_open", + "block_close", + "message_close", + "assistant_text_delta", + "tool_use_start", + "tool_input_delta", + "tool_result", +]); + +/** Row shape returned by {@link readEventsAfter}. */ +export interface ConversationEventRow { + conversationId: string; + seq: number; + eventType: string; + messageId: string | null; + blockIndex: number | null; + payloadJson: string; + createdAt: number; +} + +/** True when the event type is one we persist to the durable log. */ +export function isLoggableEvent(eventType: string): boolean { + return LOGGED_EVENT_TYPES.has(eventType); +} + +interface EventLike { + type: string; + seq?: number; + messageId?: string; + blockIndex?: number; +} + +/** + * Append a streaming event to the durable log. + * + * No-op when the event type is not in {@link LOGGED_EVENT_TYPES} or when + * the event does not carry a `seq` field (only post-PR-1 streaming events + * are addressable, so writing a row without `seq` would create a gap that + * the replay path cannot honor). + * + * Errors from the DB write are caught and logged — the live SSE publish + * already succeeded, so a write failure here only degrades replay, never + * the in-flight turn. Mirrors the non-fatal stance used across the + * agent-loop persistence calls. + */ +export function appendEvent( + conversationId: string, + message: ServerMessage, +): void { + if (!isLoggableEvent(message.type)) return; + const event = message as EventLike; + if (typeof event.seq !== "number") return; + try { + rawRun( + `INSERT INTO conversation_events + (conversation_id, seq, event_type, message_id, block_index, payload_json, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT (conversation_id, seq) DO NOTHING`, + conversationId, + event.seq, + message.type, + typeof event.messageId === "string" ? event.messageId : null, + typeof event.blockIndex === "number" ? event.blockIndex : null, + JSON.stringify(message), + Date.now(), + ); + } catch (err) { + log.warn( + { err, conversationId, eventType: message.type, seq: event.seq }, + "Failed to append conversation event (non-fatal)", + ); + } +} + +/** + * Return every event for `conversationId` with `seq > afterSeq`, in seq + * order. Used by the SSE handshake to replay events the client missed + * before subscribing to live events. + */ +export function readEventsAfter( + conversationId: string, + afterSeq: number, +): ConversationEventRow[] { + return rawAll<{ + conversation_id: string; + seq: number; + event_type: string; + message_id: string | null; + block_index: number | null; + payload_json: string; + created_at: number; + }>( + `SELECT conversation_id, seq, event_type, message_id, block_index, + payload_json, created_at + FROM conversation_events + WHERE conversation_id = ? AND seq > ? + ORDER BY seq ASC`, + conversationId, + afterSeq, + ).map((row) => ({ + conversationId: row.conversation_id, + seq: row.seq, + eventType: row.event_type, + messageId: row.message_id, + blockIndex: row.block_index, + payloadJson: row.payload_json, + createdAt: row.created_at, + })); +} + +/** + * Read the highest persisted `seq` for `conversationId`. Used by + * {@link nextSeq} to reseed the in-memory counter when a conversation + * re-enters memory after eviction, so a fresh `nextSeq()` cannot collide + * with rows the previous incarnation wrote. + * + * Returns `0` when the DB is unavailable or the query fails — the worst + * case is a sequence-collision risk for evicted conversations under DB + * stress, which is bounded by the 7-day retention window. The fallback + * also keeps unit tests that exercise streaming handlers without a real + * DB working without per-test mocking. + */ +export function maxSeqForConversation(conversationId: string): number { + try { + const row = rawAll<{ max_seq: number | null }>( + `SELECT MAX(seq) AS max_seq FROM conversation_events WHERE conversation_id = ?`, + conversationId, + )[0]; + return row?.max_seq ?? 0; + } catch { + return 0; + } +} + +/** + * Drop event rows older than `cutoffMs` (epoch ms). Returns the number of + * rows deleted. The default retention window is 7 days — long enough to + * absorb realistic reconnect gaps (network drops, app backgrounding, + * device sleep) while keeping the table bounded. + */ +export function trimEventsOlderThan(cutoffMs: number): number { + try { + return rawRun( + `DELETE FROM conversation_events WHERE created_at < ?`, + cutoffMs, + ); + } catch (err) { + log.warn( + { err, cutoffMs }, + "Failed to trim conversation_events (non-fatal)", + ); + return 0; + } +} + +/** Default retention window for the durable event log. */ +export const EVENT_LOG_RETENTION_MS = 7 * 24 * 60 * 60 * 1000; + +/** + * Run a single retention sweep using {@link EVENT_LOG_RETENTION_MS}. + * + * Called once at daemon startup. Keeping the cleanup minimal (single + * sweep at boot rather than a periodic cron) is intentional — the table + * is small for active installs and the next boot will catch up any + * accumulation between restarts. + */ +export function runEventLogCleanup(): number { + const cutoff = Date.now() - EVENT_LOG_RETENTION_MS; + const deleted = trimEventsOlderThan(cutoff); + if (deleted > 0) { + log.info({ deleted, cutoff }, "Trimmed conversation_events"); + } + return deleted; +} diff --git a/assistant/src/daemon/lifecycle.ts b/assistant/src/daemon/lifecycle.ts index 850242d188d..7f3600f9d78 100644 --- a/assistant/src/daemon/lifecycle.ts +++ b/assistant/src/daemon/lifecycle.ts @@ -938,6 +938,17 @@ export async function runDaemon(): Promise { } } + // Trim the durable streaming-event log to the configured retention window + // so the table cannot grow unbounded across daemon restarts. Single + // sweep at startup is intentional — the next boot catches up anything + // accumulated since. + try { + const { runEventLogCleanup } = await import("./event-log.js"); + runEventLogCleanup(); + } catch (err) { + log.warn({ err }, "Conversation event-log cleanup failed — continuing"); + } + registerWatcherProviders(); registerMessagingProviders(); diff --git a/assistant/src/daemon/message-types/messages.ts b/assistant/src/daemon/message-types/messages.ts index 780dbb71ef3..e3b54b13b7d 100644 --- a/assistant/src/daemon/message-types/messages.ts +++ b/assistant/src/daemon/message-types/messages.ts @@ -2,7 +2,11 @@ import type { AssistantTextDeltaEvent } from "../../api/events/assistant-text-delta.js"; import type { AssistantTurnStartEvent } from "../../api/events/assistant-turn-start.js"; +import type { BlockCloseEvent } from "../../api/events/block-close.js"; +import type { BlockOpenEvent } from "../../api/events/block-open.js"; +import type { MessageCloseEvent } from "../../api/events/message-close.js"; import type { MessageCompleteEvent } from "../../api/events/message-complete.js"; +import type { MessageOpenEvent } from "../../api/events/message-open.js"; import type { ToolUseStartEvent } from "../../api/events/tool-use-start.js"; import type { ChannelId, InterfaceId } from "../../channels/types.js"; import type { CommandIntent, UserMessageAttachment } from "./shared.js"; @@ -120,6 +124,10 @@ export interface ToolInputDelta { /** Database ID of the assistant message that owns this tool_use block. * Same semantics as `AssistantTextDeltaEvent.messageId`. */ messageId?: string; + /** 0-based content-block index within the parent `messageId`. */ + blockIndex?: number; + /** Monotonically increasing per-conversation sequence number. */ + seq?: number; } export interface ToolResult { @@ -183,6 +191,10 @@ export interface ToolResult { /** Structured activity metadata for rich client rendering. Optional; old * clients that key off `result` continue to work unchanged. */ activityMetadata?: ToolActivityMetadata; + /** 0-based content-block index within the parent `messageId`. */ + blockIndex?: number; + /** Monotonically increasing per-conversation sequence number. */ + seq?: number; } export interface ConfirmationRequest { @@ -532,6 +544,10 @@ export type _MessagesServerMessages = | ConfirmationRequest | SecretRequest | QuestionRequest + | MessageOpenEvent + | BlockOpenEvent + | BlockCloseEvent + | MessageCloseEvent | MessageCompleteEvent | ErrorMessage | MessageQueued diff --git a/assistant/src/daemon/streaming-events.ts b/assistant/src/daemon/streaming-events.ts new file mode 100644 index 00000000000..d31845cc90b --- /dev/null +++ b/assistant/src/daemon/streaming-events.ts @@ -0,0 +1,56 @@ +/** + * Per-conversation streaming-event sequencing. + * + * The streaming architecture (see `.private/plans/streaming-message-architecture.md`) + * tags every event the daemon emits for a conversation with a monotonically + * increasing `seq`. The sequence persists across daemon restarts and + * conversation evictions via the durable `conversation_events` table: + * `nextSeq` lazily reseeds the in-memory counter from the max persisted + * `seq` the first time a conversation surfaces (after boot, or after an + * eviction that called {@link resetSeq}). Subsequent calls bump the + * in-memory counter without touching the DB. + * + * Reseeding from durable state is required so post-eviction emits cannot + * collide with rows the previous incarnation already wrote — replay + * relies on `(conversation_id, seq)` being globally unique within the + * retention window. + * + * The counter is intentionally a module-level map keyed by conversationId + * (rather than living on `Conversation` / `AgentLoopConversationContext`) + * so non-agent-loop emit paths — slash commands, surface broadcasts, + * aux notifier injections — can stamp `seq` without having to thread the + * conversation context through every call site. + */ + +import { maxSeqForConversation } from "./event-log.js"; + +const seqCounters = new Map(); + +/** + * Return the next monotonic `seq` for the given conversation. + * + * On first access for a conversation, the counter is seeded from the max + * persisted `seq` in `conversation_events` so a daemon restart or a + * post-eviction re-entry cannot replay an existing seq value. + */ +export function nextSeq(conversationId: string): number { + let current = seqCounters.get(conversationId); + if (current === undefined) { + current = maxSeqForConversation(conversationId); + } + const next = current + 1; + seqCounters.set(conversationId, next); + return next; +} + +/** Read the current seq counter without advancing it (testing only). */ +export function peekSeq(conversationId: string): number { + return seqCounters.get(conversationId) ?? 0; +} + +/** Drop the seq counter for a conversation. Used when a conversation is + * evicted/destroyed so process memory does not grow unbounded. The next + * `nextSeq` for this conversation will reseed from durable storage. */ +export function resetSeq(conversationId: string): void { + seqCounters.delete(conversationId); +} diff --git a/assistant/src/memory/conversation-crud.ts b/assistant/src/memory/conversation-crud.ts index 594282e2cf4..6b555e71fa3 100644 --- a/assistant/src/memory/conversation-crud.ts +++ b/assistant/src/memory/conversation-crud.ts @@ -306,6 +306,13 @@ interface InsertMessageCoreParams { content: string; metadata?: Record; clientMessageId?: string; + /** + * Optional pre-allocated message id. Callers that need a stable id before + * the row is persisted (the streaming agent loop assigns a UUIDv7 at + * message-open time so every event for the turn can stamp it) pass it + * here; the insert uses the supplied id instead of minting a fresh one. + */ + messageId?: string; } /** @@ -332,9 +339,16 @@ interface InsertMessageCoreParams { async function insertMessageCore( params: InsertMessageCoreParams, ): Promise { - const { conversationId, role, content, metadata, clientMessageId } = params; + const { + conversationId, + role, + content, + metadata, + clientMessageId, + messageId: preAssignedMessageId, + } = params; const db = getDb(); - const messageId = uuid(); + const messageId = preAssignedMessageId ?? uuid(); if (metadata) { const result = messageMetadataSchema.safeParse(metadata); @@ -1191,7 +1205,7 @@ export async function addMessage( role: MessageRole, content: string, metadata?: Record, - opts?: { skipIndexing?: boolean }, + opts?: { skipIndexing?: boolean; messageId?: string }, clientMessageId?: string, ) { const inserted = await insertMessageCore({ @@ -1200,6 +1214,7 @@ export async function addMessage( content, metadata, clientMessageId, + ...(opts?.messageId ? { messageId: opts.messageId } : {}), }); if (inserted.deduplicated) { diff --git a/assistant/src/memory/db-init.ts b/assistant/src/memory/db-init.ts index 577d18b3be5..4185dd25140 100644 --- a/assistant/src/memory/db-init.ts +++ b/assistant/src/memory/db-init.ts @@ -67,6 +67,7 @@ import { migrateContactsRolePrincipal, migrateContactsUserFileColumn, migrateConversationCleanedAt, + migrateConversationEvents, migrateConversationForkLineage, migrateConversationHostAccess, migrateConversationInferenceProfileSession, @@ -466,6 +467,7 @@ export function initializeDb(): void { migrateLlmRequestLogCallSite, migrateDropProviderConnectionStatus, migrateMessagesClientMessageId, + migrateConversationEvents, ]; // Run each migration step, catching and logging individual failures so one diff --git a/assistant/src/memory/migrations/267-conversation-events.ts b/assistant/src/memory/migrations/267-conversation-events.ts new file mode 100644 index 00000000000..63cab0e4cb4 --- /dev/null +++ b/assistant/src/memory/migrations/267-conversation-events.ts @@ -0,0 +1,56 @@ +import { type DrizzleDb, getSqliteFrom } from "../db-connection.js"; + +/** + * Create the `conversation_events` table — a durable, append-only log of + * streaming events emitted during an assistant turn. + * + * Each row records one event the daemon published over the SSE channel for + * a conversation, tagged with its per-conversation monotonic `seq`. The + * table backs `Last-Event-Id` replay on SSE reconnect (see PR 2 of the + * streaming-message-architecture plan) so a client that drops a stream + * mid-turn can resume without losing or duplicating events. + * + * Columns: + * - `conversation_id` — owner conversation; rows are scoped per-conversation + * for cheap range scans on reconnect replay. + * - `seq` — the per-conversation monotonic sequence stamped on the event by + * {@link nextSeq}. The composite `(conversation_id, seq)` is the natural + * primary key and the only index used during replay. + * - `event_type` — the `ServerMessage.type` discriminant (e.g. + * `assistant_text_delta`, `block_open`, `message_close`). + * - `message_id` — the assistant `messageId` the event references, when one + * is in scope. Stored separately from the payload so future queries can + * filter by message without parsing the JSON blob. + * - `block_index` — the 0-based block coordinate within the message, when + * applicable. Same rationale as `message_id`. + * - `payload_json` — the full event payload, serialized so the SSE handshake + * can replay it verbatim. + * - `created_at` — wall-clock insertion time (epoch ms). Drives the periodic + * trimmer that drops rows older than the retention window. + * + * Idempotent — re-running the migration is a no-op once the table and index + * exist. Uses CREATE TABLE/INDEX IF NOT EXISTS so no checkpoint entry is + * required. + */ +export function migrateConversationEvents(database: DrizzleDb): void { + const raw = getSqliteFrom(database); + raw.exec(/*sql*/ ` + CREATE TABLE IF NOT EXISTS conversation_events ( + conversation_id TEXT NOT NULL, + seq INTEGER NOT NULL, + event_type TEXT NOT NULL, + message_id TEXT, + block_index INTEGER, + payload_json TEXT NOT NULL, + created_at INTEGER NOT NULL, + PRIMARY KEY (conversation_id, seq) + ) + `); + // Time-range index for the cleanup task that trims rows older than the + // retention window. The composite primary key already covers the replay + // path, so no additional `(conversation_id, seq)` index is needed. + raw.exec(/*sql*/ ` + CREATE INDEX IF NOT EXISTS idx_conversation_events_created_at + ON conversation_events (created_at) + `); +} diff --git a/assistant/src/memory/migrations/index.ts b/assistant/src/memory/migrations/index.ts index 1419bdf3a50..a16ae4f6dd4 100644 --- a/assistant/src/memory/migrations/index.ts +++ b/assistant/src/memory/migrations/index.ts @@ -253,6 +253,7 @@ export { export { migrateLlmRequestLogCallSite } from "./264-llm-request-log-call-site.js"; export { migrateDropProviderConnectionStatus } from "./265-drop-provider-connection-status.js"; export { migrateMessagesClientMessageId } from "./266-messages-client-message-id.js"; +export { migrateConversationEvents } from "./267-conversation-events.js"; export { MIGRATION_REGISTRY, type MigrationRegistryEntry, diff --git a/assistant/src/plugins/types.ts b/assistant/src/plugins/types.ts index 362806aadb8..2835d4022da 100644 --- a/assistant/src/plugins/types.ts +++ b/assistant/src/plugins/types.ts @@ -470,7 +470,16 @@ export type PersistAddArgs = { readonly role: MessageRole; readonly content: string; readonly metadata?: Record; - readonly addOptions?: { readonly skipIndexing?: boolean }; + readonly addOptions?: { + readonly skipIndexing?: boolean; + /** + * Optional pre-allocated message id. The streaming agent loop pre-assigns + * a UUIDv7 at message-open time so every event in the turn can stamp it; + * when the row finally persists it must reuse the same id. Plugin + * middleware should pass this through unchanged. + */ + readonly messageId?: string; + }; /** * When `true`, the default plugin additionally invokes * {@link syncMessageToDisk} with the returned message's id. Requires diff --git a/assistant/src/runtime/routes/events-routes.ts b/assistant/src/runtime/routes/events-routes.ts index 4a2fec318a7..d4d2ba3d492 100644 --- a/assistant/src/runtime/routes/events-routes.ts +++ b/assistant/src/runtime/routes/events-routes.ts @@ -26,10 +26,17 @@ import { z } from "zod"; import type { HostProxyCapability } from "../../channels/types.js"; import { parseInterfaceId, supportsHostProxy } from "../../channels/types.js"; import { emitContactChange } from "../../contacts/contact-events.js"; +import { readEventsAfter } from "../../daemon/event-log.js"; +import type { ServerMessage } from "../../daemon/message-protocol.js"; import { getConversation } from "../../memory/conversation-crud.js"; import { getOrCreateConversation } from "../../memory/conversation-key-store.js"; import { getLogger } from "../../util/logger.js"; -import { formatSseFrame, formatSseHeartbeat } from "../assistant-event.js"; +import type { AssistantEvent } from "../assistant-event.js"; +import { + buildAssistantEvent, + formatSseFrame, + formatSseHeartbeat, +} from "../assistant-event.js"; import type { AssistantEventCallback, AssistantEventFilter, @@ -240,10 +247,24 @@ const defaultSseShedReporter: SseShedReporter = (reason, inst) => { * Headers (optional): * X-Vellum-Client-Id -- stable per-install UUID identifying this client. * X-Vellum-Interface-Id -- interface type (e.g. "macos", "ios", "web"). + * Last-Event-Id -- last `seq` the client successfully processed for + * this conversation. When set, the daemon replays + * persisted events with `seq > last_event_id` from + * `conversation_events` before fanning out live + * events. Standard SSE field — `EventSource` + * sends it automatically on reconnect. Only honored + * when the stream is scoped to one conversation + * (via `conversationId` or `conversationKey`). + * + * When both client-id headers are present, the subscriber is registered as + * a client in the event hub with metadata (interfaceId, capabilities). The + * hub handles lifecycle — dispose() unregisters the client automatically. * - * When both are present, the subscriber is registered as a client in the - * event hub with metadata (interfaceId, capabilities). The hub handles - * lifecycle — dispose() unregisters the client automatically. + * Query params (optional): + * lastEventId -- alternative to the `Last-Event-Id` header for + * transports that cannot set arbitrary headers + * (mobile WKWebView quirks, native SDKs). Header + * wins if both are supplied. * * Options (for testing): * hub -- override the event hub (defaults to process singleton). @@ -281,6 +302,19 @@ export function handleSubscribeAssistantEvents( const rawInterfaceId = headers?.["x-vellum-interface-id"]; const rawMachineName = headers?.["x-vellum-machine-name"]; const rawActorPrincipalId = headers?.["x-vellum-actor-principal-id"]; + // `Last-Event-Id` is the standard SSE reconnect field; `lastEventId` query + // param is the fallback for transports that cannot set arbitrary headers. + // Header wins when both are supplied so an explicit reconnect always + // outranks a stale URL. + const rawLastEventId = + headers?.["last-event-id"]?.trim() || + queryParams?.lastEventId?.trim() || + ""; + const lastEventIdParsed = rawLastEventId ? Number(rawLastEventId) : NaN; + const lastEventId = + Number.isFinite(lastEventIdParsed) && lastEventIdParsed >= 0 + ? Math.trunc(lastEventIdParsed) + : null; const clientId = rawClientId?.trim() || null; const interfaceId = clientId ? parseInterfaceId(rawInterfaceId?.trim()) @@ -357,6 +391,39 @@ export function handleSubscribeAssistantEvents( conversationKey: scopeConversationKey, }; + // -- Last-Event-Id replay state -------------------------------------------- + // When the client supplies a `Last-Event-Id`, we replay persisted events + // with `seq > lastEventId` from the durable log before delivering live + // events. Two pieces of state coordinate the handoff so neither path + // delivers a duplicate: + // + // - `replayMaxSeq` — the highest seq written to the client during + // replay. Live events with `seq <= replayMaxSeq` are dropped by the + // callback because the replay already covered them. + // - `replayComplete` + `pendingLiveEvents` — until replay finishes, + // live events are buffered in order rather than written directly to + // the SSE stream, so replay and live can never interleave. + // + // When `lastEventId` is null we skip replay entirely; the callback then + // writes live events straight through. + const replayRequested = + lastEventId !== null && filter.conversationId !== undefined; + let replayMaxSeq = lastEventId ?? 0; + let replayComplete = !replayRequested; + const pendingLiveEvents: AssistantEvent[] = []; + + function writeEventFrame(event: AssistantEvent): void { + const controller = controllerRef; + if (!controller) return; + controller.enqueue(encoder.encode(formatSseFrame(event))); + instrumentation.eventsDelivered += 1; + } + + function eventSeq(event: AssistantEvent): number | undefined { + const msg = event.message as { seq?: unknown }; + return typeof msg.seq === "number" ? msg.seq : undefined; + } + ensureEventLoopDelayMonitorStarted(); function cleanup() { @@ -381,8 +448,19 @@ export function handleSubscribeAssistantEvents( cleanup(); return; } - controller.enqueue(encoder.encode(formatSseFrame(event))); - instrumentation.eventsDelivered += 1; + // Buffer live events until replay completes so the SSE stream observes + // replayed-then-live order. After replay, fall through to the dedupe + // check that drops events the replay already covered. + if (!replayComplete) { + pendingLiveEvents.push(event); + return; + } + const seq = eventSeq(event); + if (seq !== undefined && seq <= replayMaxSeq) { + // Replay already delivered this seq (or a higher one) — drop. + return; + } + writeEventFrame(event); } catch { sub.dispose(); cleanup(); @@ -434,6 +512,56 @@ export function handleSubscribeAssistantEvents( controller.enqueue(encoder.encode(formatSseHeartbeat())); instrumentation.heartbeatsSent += 1; + // Replay persisted events with `seq > lastEventId` for this + // conversation, then drain anything the live callback buffered + // while we were reading from the DB. Skipping the conversation- + // unscoped case is intentional — replay needs a single + // conversation to scope the query and is the only mode the + // reconnect protocol cares about. + if (replayRequested && filter.conversationId) { + try { + const rows = readEventsAfter( + filter.conversationId, + lastEventId ?? 0, + ); + for (const row of rows) { + try { + const payload = JSON.parse(row.payloadJson) as ServerMessage; + const replayEvent = buildAssistantEvent( + payload, + filter.conversationId, + ); + writeEventFrame(replayEvent); + if (row.seq > replayMaxSeq) replayMaxSeq = row.seq; + } catch (err) { + log.warn( + { + err, + conversationId: filter.conversationId, + seq: row.seq, + }, + "Failed to replay conversation event (skipping)", + ); + } + } + } catch (err) { + log.warn( + { err, conversationId: filter.conversationId, lastEventId }, + "Failed to read events for Last-Event-Id replay", + ); + } + } + + // Drain any live events buffered while replay ran, dropping any + // whose seq the replay already covered. + replayComplete = true; + for (const buffered of pendingLiveEvents) { + const seq = eventSeq(buffered); + if (seq !== undefined && seq <= replayMaxSeq) continue; + writeEventFrame(buffered); + } + pendingLiveEvents.length = 0; + heartbeatTimer = setInterval(() => { try { if (controller.desiredSize != null && controller.desiredSize <= 0) { @@ -518,6 +646,11 @@ export const ROUTES: RouteDefinition[] = [ description: "Scope to a single conversation by an external key (non-vellum channels) or the web idempotency key. Materializes a row on first use. Ignored when conversationId is also provided.", }, + { + name: "lastEventId", + description: + "Last per-conversation seq the client successfully processed. Equivalent to the standard `Last-Event-Id` SSE header; provided as a query param for transports that cannot set arbitrary request headers. The daemon replays persisted events with `seq > lastEventId` before delivering live events.", + }, ], responseHeaders: { "Content-Type": "text/event-stream", diff --git a/clients/macos/vellum-assistant/App/AppDelegate+ConnectionSetup.swift b/clients/macos/vellum-assistant/App/AppDelegate+ConnectionSetup.swift index 7ed42b356b6..d2bad440c02 100644 --- a/clients/macos/vellum-assistant/App/AppDelegate+ConnectionSetup.swift +++ b/clients/macos/vellum-assistant/App/AppDelegate+ConnectionSetup.swift @@ -219,7 +219,7 @@ extension AppDelegate { eventSubscriptionTask?.cancel() eventSubscriptionTask = Task { @MainActor [weak self] in guard let self else { return } - let stream = self.eventStreamClient.subscribe() + let stream = self.eventStreamClient.subscribeAppDelegateEvents() for await message in stream { guard !Task.isCancelled else { break } switch message { diff --git a/clients/macos/vellum-assistant/App/DiskPressureStatusStore.swift b/clients/macos/vellum-assistant/App/DiskPressureStatusStore.swift index 8203594c6c1..97968d12509 100644 --- a/clients/macos/vellum-assistant/App/DiskPressureStatusStore.swift +++ b/clients/macos/vellum-assistant/App/DiskPressureStatusStore.swift @@ -212,7 +212,7 @@ final class DiskPressureStatusStore { guard eventTask == nil, let eventStreamClient else { return } eventTask = Task { @MainActor [weak self] in guard let self else { return } - for await message in eventStreamClient.subscribe() { + for await message in eventStreamClient.subscribeDiskPressureEvents() { guard !Task.isCancelled else { break } self.handle(message) } diff --git a/clients/macos/vellum-assistant/ComputerUse/HostCuExecutor.swift b/clients/macos/vellum-assistant/ComputerUse/HostCuExecutor.swift index 41267eb2ab6..94c921dc2d4 100644 --- a/clients/macos/vellum-assistant/ComputerUse/HostCuExecutor.swift +++ b/clients/macos/vellum-assistant/ComputerUse/HostCuExecutor.swift @@ -32,7 +32,7 @@ enum HostCuExecutor { on client: GatewayConnectionManager, overlayProvider: @escaping @MainActor (_ conversationId: String, _ request: HostCuRequest) -> HostCuSessionProxy? = { _, _ in nil } ) { - // No-op: host CU requests are handled via EventStreamClient.subscribe() in AppDelegate. + // No-op: host CU requests are handled via EventStreamClient.subscribeAppDelegateEvents() in AppDelegate. } } diff --git a/clients/macos/vellum-assistant/Features/Chat/ChatView.swift b/clients/macos/vellum-assistant/Features/Chat/ChatView.swift index 71d858fed12..b1926179757 100644 --- a/clients/macos/vellum-assistant/Features/Chat/ChatView.swift +++ b/clients/macos/vellum-assistant/Features/Chat/ChatView.swift @@ -320,7 +320,12 @@ struct ChatView: View { VStack(spacing: 0) { MessageListView( // -- TranscriptProjector inputs -- - messages: viewModel.messages, + // Renders from the streaming-architecture `MessageStore` (via + // `renderedMessages`) instead of the legacy `messages` array. + // The legacy array is still populated by older streaming + // helpers, but those mutations no longer drive rendering — + // see `ChatViewModel.renderedMessages` for the merge rules. + messages: viewModel.renderedMessages, messagesRevision: viewModel.messagesRevision, isSending: viewModel.isSending, isThinking: viewModel.isThinking, @@ -365,7 +370,7 @@ struct ChatView: View { // -- Projector-resolved state -- activePendingRequestId: viewModel.activePendingRequestId, // -- Pagination -- - paginatedVisibleMessages: viewModel.paginatedVisibleMessages, + paginatedVisibleMessages: viewModel.renderedPaginatedVisibleMessages, displayedMessageCount: viewModel.displayedMessageCount, hasMoreMessages: viewModel.hasMoreMessages, isLoadingMoreMessages: viewModel.isLoadingMoreMessages, diff --git a/clients/macos/vellum-assistant/Features/Chat/TranscriptProjector.swift b/clients/macos/vellum-assistant/Features/Chat/TranscriptProjector.swift index 36f89b1a275..cfa99da6234 100644 --- a/clients/macos/vellum-assistant/Features/Chat/TranscriptProjector.swift +++ b/clients/macos/vellum-assistant/Features/Chat/TranscriptProjector.swift @@ -50,11 +50,14 @@ enum TranscriptProjector { highlightedMessageId: UUID?, autoRoutedProfileLabel: String? = nil ) -> TranscriptRenderModel { - // Deduplicate visible messages (streaming can produce duplicate IDs). - let visibleMessages: [ChatMessage] = { - var seen = Set() - return paginatedVisibleMessages.filter { seen.insert($0.id).inserted } - }() + // Per streaming-message-architecture PR 6: the MessageStreamReducer is + // the sole writer for assistant content and applies events idempotently + // by `(messageId, blockIndex, seq)`. `renderedMessages` and + // `renderedPaginatedVisibleMessages` on `ChatViewModel` already merge + // legacy rows with `MessageStore` snapshots using stable, deterministic + // ids, so the projector's input is already unique by id and the + // previous defensive dedupe pass is no longer needed. + let visibleMessages = paginatedVisibleMessages // --- Structural metadata --- diff --git a/clients/macos/vellum-assistant/Features/MainWindow/ConversationManager.swift b/clients/macos/vellum-assistant/Features/MainWindow/ConversationManager.swift index 877b45de336..26676da98c6 100644 --- a/clients/macos/vellum-assistant/Features/MainWindow/ConversationManager.swift +++ b/clients/macos/vellum-assistant/Features/MainWindow/ConversationManager.swift @@ -266,7 +266,7 @@ final class ConversationManager: ConversationRestorerDelegate { } Task { @MainActor [weak self] in guard let self else { return } - for await message in self.eventStreamClient.subscribe() { + for await message in self.eventStreamClient.subscribeConversationOrchestrationEvents() { switch message { case .conversationIdResolved(let localId, let serverId): self.resolveConversationId(from: localId, to: serverId) @@ -744,7 +744,9 @@ final class ConversationManager: ConversationRestorerDelegate { if markHistoryLoaded { viewModel.isHistoryLoaded = true } - viewModel.startMessageLoop() + // No-op when the VM was already initialized — chat-event subscription + // is started at VM init and runs for the VM lifetime. + viewModel.startChatEventSubscription() listStore.conversations.insert(conversation, at: 0) selectionStore.chatViewModels[conversation.id] = viewModel @@ -1453,7 +1455,7 @@ final class ConversationManager: ConversationRestorerDelegate { if let viewModel = selectionStore.chatViewModels[existingConversation.id] { viewModel.conversationId = item.id viewModel.isChannelConversation = updatedConversation.isChannelConversation - viewModel.ensureMessageLoopStarted() + viewModel.startChatEventSubscription() } return existingConversation.id } @@ -1462,7 +1464,7 @@ final class ConversationManager: ConversationRestorerDelegate { let viewModel = makeViewModel() viewModel.conversationId = item.id viewModel.isChannelConversation = conversationModel.isChannelConversation - viewModel.startMessageLoop() + viewModel.startChatEventSubscription() listStore.conversations.insert(conversationModel, at: 0) selectionStore.chatViewModels[conversationModel.id] = viewModel diff --git a/clients/macos/vellum-assistant/Features/MainWindow/ConversationRestorer.swift b/clients/macos/vellum-assistant/Features/MainWindow/ConversationRestorer.swift index 66bfeb21dec..430cb7e3705 100644 --- a/clients/macos/vellum-assistant/Features/MainWindow/ConversationRestorer.swift +++ b/clients/macos/vellum-assistant/Features/MainWindow/ConversationRestorer.swift @@ -131,7 +131,7 @@ final class ConversationRestorer { func startObserving(skipInitialFetch: Bool = false) { Task { @MainActor [weak self] in guard let self else { return } - for await message in self.eventStreamClient.subscribe() { + for await message in self.eventStreamClient.subscribeConversationListEvents() { switch message { case .conversationListResponse(let response): // SSE-pushed responses don't have the foreground/background diff --git a/clients/macos/vellum-assistant/Features/MainWindow/ConversationSelectionStore.swift b/clients/macos/vellum-assistant/Features/MainWindow/ConversationSelectionStore.swift index 8821818f443..233e4af0029 100644 --- a/clients/macos/vellum-assistant/Features/MainWindow/ConversationSelectionStore.swift +++ b/clients/macos/vellum-assistant/Features/MainWindow/ConversationSelectionStore.swift @@ -62,7 +62,7 @@ final class ConversationSelectionStore { activeConversation = listStore.conversationsByLocalId[conversationId] let vm = getOrCreateViewModel(for: conversationId) - vm?.ensureMessageLoopStarted() + vm?.startChatEventSubscription() onActiveConversationChanged?(conversationId) // Notify the daemon so it rebinds the socket to this conversation. @@ -391,7 +391,7 @@ final class ConversationSelectionStore { /// Returns an existing or newly-created ViewModel for a detached pop-out window. func viewModelForDetachedWindow(conversationLocalId: UUID) -> ChatViewModel? { let vm = getOrCreateViewModel(for: conversationLocalId) - vm?.ensureMessageLoopStarted() + vm?.startChatEventSubscription() return vm } diff --git a/clients/macos/vellum-assistant/Features/MainWindow/MainWindow.swift b/clients/macos/vellum-assistant/Features/MainWindow/MainWindow.swift index e2fc34df622..8659907a950 100644 --- a/clients/macos/vellum-assistant/Features/MainWindow/MainWindow.swift +++ b/clients/macos/vellum-assistant/Features/MainWindow/MainWindow.swift @@ -357,7 +357,7 @@ public final class MainWindow { documentManager.connectionManager = connectionManager Task { @MainActor [weak self] in guard let self else { return } - for await message in self.eventStreamClient.subscribe() { + for await message in self.eventStreamClient.subscribeTraceEvents() { switch message { case .traceEvent(let msg): self.traceStore.ingest(msg) diff --git a/clients/macos/vellum-assistant/Features/MainWindow/MainWindowView.swift b/clients/macos/vellum-assistant/Features/MainWindow/MainWindowView.swift index eb1bd31e965..cd308e85a9b 100644 --- a/clients/macos/vellum-assistant/Features/MainWindow/MainWindowView.swift +++ b/clients/macos/vellum-assistant/Features/MainWindow/MainWindowView.swift @@ -162,7 +162,7 @@ struct MainWindowView: View { // without an app relaunch. let homeStoreInstance = HomeStore( client: DefaultHomeStateClient(), - messageStream: eventStreamClient.subscribe() + messageStream: eventStreamClient.subscribeHomeEvents() ) self._homeStore = State(initialValue: homeStoreInstance) // Same eager-construction rationale for the activity feed store @@ -173,7 +173,7 @@ struct MainWindowView: View { // already looking at the feed. self._feedStore = State(initialValue: HomeFeedStore( client: DefaultHomeFeedClient(), - messageStream: eventStreamClient.subscribe(), + messageStream: eventStreamClient.subscribeHomeEvents(), onSSEUpdate: { [weak homeStoreInstance] in guard let homeStoreInstance else { return } if !homeStoreInstance.isHomeTabVisible { @@ -181,9 +181,9 @@ struct MainWindowView: View { } } )) - // Meet status panel subscribes to the same shared SSE stream. + // Meet status panel subscribes to the meet-domain dispatcher. self._meetStatusViewModel = State(initialValue: MeetStatusViewModel( - messageStream: eventStreamClient.subscribe() + messageStream: eventStreamClient.subscribeMeetEvents() )) } diff --git a/clients/macos/vellum-assistant/Features/MainWindow/ThreadWindowManager.swift b/clients/macos/vellum-assistant/Features/MainWindow/ThreadWindowManager.swift index 16f7540eb49..697cc6d58a9 100644 --- a/clients/macos/vellum-assistant/Features/MainWindow/ThreadWindowManager.swift +++ b/clients/macos/vellum-assistant/Features/MainWindow/ThreadWindowManager.swift @@ -74,7 +74,7 @@ final class ThreadWindowManager { ) threadWindows[conversationLocalId] = threadWindow - viewModel.ensureMessageLoopStarted() + viewModel.startChatEventSubscription() log.info("Opened thread window for \(conversationLocalId), \(self.threadWindows.count) total") return true diff --git a/clients/macos/vellum-assistant/Features/Settings/SettingsStore.swift b/clients/macos/vellum-assistant/Features/Settings/SettingsStore.swift index e69d77063bd..5d83330d25b 100644 --- a/clients/macos/vellum-assistant/Features/Settings/SettingsStore.swift +++ b/clients/macos/vellum-assistant/Features/Settings/SettingsStore.swift @@ -743,7 +743,7 @@ public final class SettingsStore: ObservableObject { // Subscribe to SSE-pushed config updates Task { @MainActor [weak self] in guard let self, let eventStreamClient = self.eventStreamClient else { return } - for await message in eventStreamClient.subscribe() { + for await message in eventStreamClient.subscribeSettingsEvents() { switch message { case .ingressConfigResponse(let response): self.handleIngressConfigResponse(response) diff --git a/clients/macos/vellum-assistantTests/TranscriptProjectorTests.swift b/clients/macos/vellum-assistantTests/TranscriptProjectorTests.swift index b9d2748cc92..373a4cce130 100644 --- a/clients/macos/vellum-assistantTests/TranscriptProjectorTests.swift +++ b/clients/macos/vellum-assistantTests/TranscriptProjectorTests.swift @@ -446,16 +446,23 @@ final class TranscriptProjectorTests: XCTestCase { XCTAssertFalse(model.canInlineProcessing) } - // MARK: - Duplicate Message Deduplication + // MARK: - Message Identity Trust + // + // Per streaming-message-architecture PR 6, the projector no longer + // dedupes by id — the `MessageStreamReducer` is the sole writer for + // assistant content and its idempotent apply means upstream input is + // already unique by id. The renderer trusts that contract. + + func testProjectorPreservesInputOrderAndIdentity() { + let msg1 = makeMessage(role: .assistant, text: "First") + let msg2 = makeMessage(role: .user, text: "Second") + let msg3 = makeMessage(role: .assistant, text: "Third") - func testDuplicateMessageIdsAreDeduped() { - let sharedId = UUID() - let msg1 = makeMessage(id: sharedId, role: .assistant, text: "First") - let msg2 = makeMessage(id: sharedId, role: .assistant, text: "Duplicate") - - let model = project(messages: [msg1, msg2]) - XCTAssertEqual(model.rows.count, 1, "Duplicate IDs should be deduped") - XCTAssertEqual(model.rows[0].message.text, "First", "First occurrence wins") + let model = project(messages: [msg1, msg2, msg3]) + XCTAssertEqual(model.rows.count, 3, "All input rows pass through 1:1") + XCTAssertEqual(model.rows[0].message.text, "First") + XCTAssertEqual(model.rows[1].message.text, "Second") + XCTAssertEqual(model.rows[2].message.text, "Third") } // MARK: - Active Pending Request ID Pass-Through diff --git a/clients/shared/Features/Chat/ChatViewModel.swift b/clients/shared/Features/Chat/ChatViewModel.swift index 117c93783bb..9c22a6a39ec 100644 --- a/clients/shared/Features/Chat/ChatViewModel.swift +++ b/clients/shared/Features/Chat/ChatViewModel.swift @@ -216,6 +216,73 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate { public var messagesRevision: UInt64 { messageManager.messagesRevision } + + /// Renderable transcript merged from the new `MessageStore` (streaming + /// architecture's source of truth) and the legacy `messages` array. + /// + /// Resolution rules: + /// - **Assistant messages**: rendered from `MessageStore` snapshots when + /// any are available. The reducer (`MessageStreamReducer`) is the sole + /// writer for assistant content under the new architecture, and its + /// idempotent apply is what makes the streaming-then-reload duplication + /// symptom structurally impossible. + /// - **Legacy assistant rows are deduped by `daemonMessageId`**: a legacy + /// row whose `daemonMessageId` already exists in `MessageStore` is + /// dropped (the snapshot wins), and any remaining legacy assistant rows + /// without a matching snapshot are kept (covers history-loaded messages + /// that pre-date the new event protocol). + /// - **Non-assistant rows** (user bubbles, confirmations, system errors, + /// guardian decisions) flow through from the legacy array unchanged — + /// they are not yet part of the new event protocol. + /// + /// Ordering follows the legacy `messages` array for rows present there; + /// any MessageStore snapshot without a legacy anchor is appended at the + /// end in `messageOrder`. + public var renderedMessages: [ChatMessage] { + let snapshots = messageStore.messages + guard !snapshots.isEmpty else { return messages } + + var consumedDaemonIds = Set() + var result: [ChatMessage] = [] + result.reserveCapacity(messages.count + snapshots.count) + + for legacy in messages { + switch legacy.role { + case .assistant: + if let daemonId = legacy.daemonMessageId, + let snapshot = snapshots[daemonId] { + // Legacy row anchors a snapshot — render the snapshot + // and mark the daemon id consumed so the trailing + // append loop doesn't duplicate it. + result.append(MessageStore.chatMessage(from: snapshot)) + consumedDaemonIds.insert(daemonId) + } else if legacy.daemonMessageId != nil { + // Persisted assistant row from history with no live + // snapshot — render the legacy row as-is. + result.append(legacy) + } else { + // Streaming-bubble lazy-created by the legacy code path. + // Dropped: the corresponding MessageStore snapshot (if + // any) is the authoritative source and is appended + // below. This is what closes the duplication failure + // mode — the legacy "anonymous delta -> new bubble" + // branch no longer reaches the renderer. + continue + } + case .user: + result.append(legacy) + } + } + + // Append any streamed assistant messages that don't have a legacy + // anchor. Stable order from `messageOrder` keeps SwiftUI identity + // diffing predictable across re-renders. + for snapshot in messageStore.orderedMessages { + guard !consumedDaemonIds.contains(snapshot.id) else { continue } + result.append(MessageStore.chatMessage(from: snapshot)) + } + return result + } public var inputText: String { get { messageManager.inputText } set { messageManager.inputText = newValue } @@ -708,6 +775,14 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate { static let maxImageSize = ChatAttachmentManager.maxImageSize public let subagentDetailStore = SubagentDetailStore() + /// New transcript representation populated by `MessageStreamReducer` from + /// the daemon's PR-1 streaming events (`message_open` / `block_open` / + /// `block_close` / `message_close`). Currently **unused by any view** — + /// the on-screen transcript is still driven by the legacy `messages` + /// array. PR 4 of the streaming-message-architecture plan flips renderers + /// to read from this store. + public let messageStore = MessageStore() + @ObservationIgnored let messageStreamReducer: MessageStreamReducer let connectionManager: GatewayConnectionManager let eventStreamClient: EventStreamClient private let settingsClient: any SettingsClientProtocol @@ -735,7 +810,6 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate { public var conversationId: String? { didSet { - broadcastFilter.conversationId = conversationId // If the daemon reconnected before this VM had a conversation ID, a deferred // flush was requested. Now that we have a conversation, run it. if conversationId != nil && needsOfflineFlush { @@ -875,13 +949,12 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate { /// (conversation_create sent, awaiting conversation_info). Used by ConversationManager /// to decide whether it's safe to release the VM on archive. public var isBootstrapping: Bool { bootstrapCorrelationId != nil } - @ObservationIgnored var messageLoopTask: Task? - /// Monotonically increasing ID used to distinguish successive message-loop - /// tasks so that a cancelled loop's cleanup doesn't clear a newer replacement. - @ObservationIgnored private var messageLoopGeneration: UInt64 = 0 - /// Mutable filter shared with EventStreamClient so conversation-scoped SSE - /// messages are only delivered to the matching subscriber. - @ObservationIgnored private let broadcastFilter = EventStreamClient.ConversationFilter() + /// Single chat-event subscription that lives for the lifetime of this VM. + /// Started in `init` via `subscribeChatEvents()` and torn down in `deinit`. + /// Replaces the legacy `startMessageLoop` / `messageLoopGeneration` + /// machinery, which restarted the subscription per turn and produced a + /// double-subscriber window during the cancel-and-resubscribe gap. + @ObservationIgnored var chatEventSubscriptionTask: Task? @ObservationIgnored var currentAssistantMessageId: UUID? /// The trimmed user text that initiated the current assistant turn. /// Used to tag the assistant message (e.g. modelList for "/models") without @@ -1050,6 +1123,28 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate { paginationState.paginatedVisibleMessages } + /// Renderable paginated suffix derived from `renderedMessages` (the + /// MessageStore-backed transcript) rather than the legacy + /// `paginationState.paginatedVisibleMessages` cache. Pagination math is + /// applied to the merged transcript so streaming bubbles always appear in + /// the latest window — without this, the legacy lazy-bubble suppression + /// in `renderedMessages` could leave the on-screen suffix empty during + /// short, single-message turns. + public var renderedPaginatedVisibleMessages: [ChatMessage] { + let allRendered = ChatVisibleMessageFilter.visibleMessages(from: renderedMessages) + if paginationState.isShowAllMode { + let cap = ChatPaginationState.maxPaginatedWindowSize + if allRendered.count <= cap { return allRendered } + let defaultStart = allRendered.count - cap + let start = max(0, min(paginationState.windowOldestIndex ?? defaultStart, defaultStart)) + return Array(allRendered[start..<(start + cap)]) + } + if displayedMessageCount < allRendered.count { + return Array(allRendered.suffix(displayedMessageCount)) + } + return allRendered + } + /// Whether `paginatedVisibleMessages` is empty. Prefer over /// `paginatedVisibleMessages.isEmpty` to avoid observing the full array. public var isPaginatedEmpty: Bool { @@ -1286,6 +1381,13 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate { self.interactionClient = interactionClient self.conversationQueueClient = conversationQueueClient self.onToolCallsComplete = onToolCallsComplete + // Initialize the new streaming-architecture reducer. The data path is + // wired so events accumulate into `messageStore`, but no view reads + // from the store yet (see the property doc comment for context). + self.messageStreamReducer = MessageStreamReducer( + store: messageStore, + eventStreamClient: eventStreamClient + ) self.paginationState = ChatPaginationState( messageManager: messageManager ) @@ -1313,6 +1415,31 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate { // Initialize the action handler for server message dispatch. self.actionHandler = ChatActionHandler(viewModel: self) + // Subscribe to the typed chat-event dispatcher for the lifetime of + // this VM. The subscription does not restart per turn (which is what + // the legacy `startMessageLoop` did), so there is no window in which + // two overlapping subscribers can both apply the same event — one of + // the three root causes of the duplication bug this plan addresses. + startChatEventSubscription() + + // Start consuming streaming events into the new MessageStore. As of + // PR 4, the chat list renders from `renderedMessages` (which merges + // the legacy `messages` array with snapshots from this store via + // `MessageStore.chatMessages`). + self.messageStreamReducer.start() + + // Supply the persisted `Last-Event-Id` watermark on SSE (re)connect + // so the daemon's durable event log can skip events the client has + // already applied. The daemon only honors the header on + // conversation-scoped subscriptions; the current global stream + // ignores it, but the plumbing is correct for PR 5's per-conversation + // subscription model. + eventStreamClient.lastEventIdProvider = { [weak self] in + guard let self, let conversationId = self.conversationId else { return nil } + guard let seq = LastAppliedSeqStore.shared.seq(forConversation: conversationId) else { return nil } + return String(seq) + } + // Surface attachment validation errors in the error manager so the UI // can show them without the attachment manager needing a direct reference. attachmentManager.onError = { [weak self] message in @@ -1503,55 +1630,27 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate { sendCoordinator.flushOfflineQueue() } - public func startMessageLoop() { - messageLoopTask?.cancel() - let messageStream = eventStreamClient.subscribe(filter: broadcastFilter) - - messageLoopGeneration &+= 1 - let generation = messageLoopGeneration - - messageLoopTask = Task { @MainActor [weak self] in - for await message in messageStream { + /// Start the chat-event subscription for the lifetime of this view model. + /// + /// Safe to call multiple times — no-ops once a subscription is already + /// active. The stream is owned by `EventStreamClient` and lives for as + /// long as the SSE connection (or this VM) does; we no longer tear it + /// down between turns. Mid-turn connection drops are recovered through + /// the `daemonDidReconnect` / `eventStreamDidReconnect` notifications, + /// which run `handleTransportReconnect()` — see that method for the + /// spinner/cursor/history-catch-up logic that used to live inline in + /// the per-turn loop restart. + public func startChatEventSubscription() { + guard chatEventSubscriptionTask == nil else { return } + let stream = eventStreamClient.subscribeChatEvents() + chatEventSubscriptionTask = Task { @MainActor [weak self] in + for await message in stream { guard let self, !Task.isCancelled else { break } self.handleServerMessage(message) } - // Stream ended (e.g. daemon disconnected) — clear the task reference - // so the next sendUserMessage() call will re-subscribe. - // Only nil out if this task is still the current one; a cancelled - // loop that finishes after its replacement must not wipe the new - // task reference, which would cause duplicate subscriptions. - if self?.messageLoopGeneration == generation { - self?.messageLoopTask = nil - // Reset spinner state — if the connection drops mid-turn the client - // never receives message_complete, leaving the UI stuck. - self?.isThinking = false - self?.isSending = false - self?.isCancelling = false - // Stream dropped mid-turn — `message_complete` won't arrive, - // so clear pending turns to avoid bumping - // `interactiveTurnCompletionTick` on the next turn. - self?.messageManager.pendingUserTurnCount = 0 - self?.messageManager.staleCancelEventsExpected = 0 - if let existingId = self?.currentAssistantMessageId { - self?.messages.finalizeStreamingMessage(id: existingId, completeToolCalls: .none) - } - self?.clearCurrentTurnTracking() - self?.discardStreamingBuffer() - self?.discardPartialOutputBuffer() - // If a send-direct was pending when the stream dropped, - // dispatch it now so the message isn't silently lost. - self?.dispatchPendingSendDirect() - } } } - /// Start the daemon message stream if this chat has a bound conversation and - /// no active loop yet. - public func ensureMessageLoopStarted() { - guard conversationId != nil, messageLoopTask == nil else { return } - startMessageLoop() - } - /// Send a message to the daemon without showing a user bubble in the chat. /// Used for automated actions like inline model picker selections. /// Returns `true` if the message was sent (or a conversation bootstrap was started), @@ -1655,10 +1754,10 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate { suggestion = nil pendingSuggestionRequestId = nil - // Make sure we're listening for the response - if messageLoopTask == nil { - startMessageLoop() - } + // Make sure we're listening for the response. Idempotent — the + // chat-event subscription is started once at init and lives for + // the lifetime of the VM. + startChatEventSubscription() Task { let success = await regenerateClient.regenerate(conversationId: conversationId) @@ -2553,7 +2652,7 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate { // Cancel all Combine subscriptions first so no new work can be scheduled // from incoming publisher events while the remaining cleanup runs. cancellables.removeAll() - messageLoopTask?.cancel() + chatEventSubscriptionTask?.cancel() streamingFlushTask?.cancel() partialOutputFlushTask?.cancel() cancelTimeoutTask?.cancel() diff --git a/clients/shared/Features/Chat/LastAppliedSeqStore.swift b/clients/shared/Features/Chat/LastAppliedSeqStore.swift new file mode 100644 index 00000000000..353d5aa2ac1 --- /dev/null +++ b/clients/shared/Features/Chat/LastAppliedSeqStore.swift @@ -0,0 +1,69 @@ +import Foundation +import os + +private let log = Logger(subsystem: Bundle.appBundleIdentifier, category: "LastAppliedSeqStore") + +/// Persists the highest `seq` the client has successfully applied for each +/// conversation. The reducer (`MessageStreamReducer`) records every applied +/// event here; on SSE (re)connect, `EventStreamClient` reads the value back +/// and sends it as the `Last-Event-Id` (a `?lastEventId=N` query parameter, +/// since the daemon accepts both) so the durable event log only replays +/// events the client hasn't seen. +/// +/// Backed by `UserDefaults` under the `vellum.streaming.lastAppliedSeq` +/// namespace. The value space is per-conversation — `setSeq` is a monotonic +/// max so out-of-order updates can't move the watermark backward. +/// +/// Concurrency: the store is `Sendable`-safe via a NSLock; `UserDefaults` +/// itself is thread-safe but the read-then-write `max` operation needs +/// serialization to avoid losing increments under concurrent writers. +public final class LastAppliedSeqStore: @unchecked Sendable { + + public static let shared = LastAppliedSeqStore() + + private let defaults: UserDefaults + private let lock = NSLock() + private static let keyPrefix = "vellum.streaming.lastAppliedSeq." + + public init(defaults: UserDefaults = .standard) { + self.defaults = defaults + } + + /// Returns the stored seq for `conversationId`, or `nil` if the client + /// has never applied an event for this conversation. + public func seq(forConversation conversationId: String) -> Int? { + let key = Self.key(for: conversationId) + lock.lock() + defer { lock.unlock() } + guard defaults.object(forKey: key) != nil else { return nil } + return defaults.integer(forKey: key) + } + + /// Monotonically advance the stored seq for `conversationId`. A `seq` + /// less than or equal to the existing watermark is a no-op so replayed + /// events can't move the watermark backward. + public func setSeq(_ seq: Int, forConversation conversationId: String) { + guard seq > 0 else { return } + let key = Self.key(for: conversationId) + lock.lock() + defer { lock.unlock() } + let existing = defaults.object(forKey: key) != nil ? defaults.integer(forKey: key) : 0 + if seq > existing { + defaults.set(seq, forKey: key) + } + } + + /// Clear the stored seq for `conversationId`. Called when the user + /// deletes a conversation so the next replay isn't gated on a stale + /// watermark. + public func clear(conversationId: String) { + let key = Self.key(for: conversationId) + lock.lock() + defer { lock.unlock() } + defaults.removeObject(forKey: key) + } + + private static func key(for conversationId: String) -> String { + keyPrefix + conversationId + } +} diff --git a/clients/shared/Features/Chat/MessageSendCoordinator.swift b/clients/shared/Features/Chat/MessageSendCoordinator.swift index c68eb5e9a38..8b7cd592838 100644 --- a/clients/shared/Features/Chat/MessageSendCoordinator.swift +++ b/clients/shared/Features/Chat/MessageSendCoordinator.swift @@ -62,7 +62,6 @@ protocol MessageSendCoordinatorDelegate: AnyObject { // MARK: - Actions func flushCoalescedPublish() - func startMessageLoop() func refreshGuardianPrompts() func discardStreamingBuffer() func discardPartialOutputBuffer() @@ -74,7 +73,6 @@ protocol MessageSendCoordinatorDelegate: AnyObject { var onConversationCreated: ((String) -> Void)? { get } var onFirstUserMessage: ((String) -> Void)? { get set } var onUserMessageSent: (() -> Void)? { get } - var messageLoopTask: Task? { get } } /// Side-effect coordinator that owns the message send/cancel/queue logic. @@ -380,8 +378,8 @@ final class MessageSendCoordinator { } } - // Subscribe to daemon stream - delegate.startMessageLoop() + // The chat-event subscription was started at VM init and lives + // for the lifetime of this view model — no per-send restart. // Generate conversation ID locally — conversation creation is implicit // for HTTP transport. The conversationKey acts as the conversation. @@ -516,10 +514,8 @@ final class MessageSendCoordinator { messageManager.pendingUserTurnCount += 1 } - // Make sure we're listening - if delegate.messageLoopTask == nil { - delegate.startMessageLoop() - } + // The chat-event subscription is started once at VM init and lives + // for the VM lifetime — no per-send (re)subscribe. // Consume pending onboarding context on the first send so it's // included in the POST body. Nil it out immediately so subsequent diff --git a/clients/shared/Features/Chat/MessageStore+ChatMessageBridge.swift b/clients/shared/Features/Chat/MessageStore+ChatMessageBridge.swift new file mode 100644 index 00000000000..844856c65b6 --- /dev/null +++ b/clients/shared/Features/Chat/MessageStore+ChatMessageBridge.swift @@ -0,0 +1,138 @@ +import Foundation + +/// Bridge from the new `MessageStore` (the streaming-message-architecture +/// reducer's source of truth) to the `ChatMessage` shape that the existing +/// chat renderer consumes. +/// +/// The chat view list renders from `ChatViewModel.renderedMessages`, which +/// merges the legacy `messages` array (user bubbles, history rows, system +/// messages, confirmations) with assistant content materialized from +/// `MessageStore` snapshots through this bridge. For any assistant message +/// the daemon emits over the new addressable-event protocol, the +/// `MessageStore` snapshot wins over the legacy row — the legacy bubble +/// created by `appendTextToCurrentMessage` is silently dropped from the +/// merged transcript. +/// +/// `MessageStreamReducer` is the sole writer for `MessageStore`, and its +/// `apply()` is idempotent per `(messageId, blockIndex, seq)`. This is what +/// makes the streaming-then-reload duplication symptom structurally +/// impossible: re-applying the same event (e.g. on `Last-Event-Id` reconnect +/// replay from PR 2) is a no-op, and the renderer always reads a single +/// source of truth keyed by the daemon's stable `messageId`. +/// +/// Non-assistant rows and history rows that pre-date the new event protocol +/// still flow through the legacy `messages` array, which is why the merge +/// in `ChatViewModel.renderedMessages` is still needed. +@MainActor +extension MessageStore { + + /// Materializes the store's snapshots as a `[ChatMessage]` ordered by + /// insertion (the order `upsertMessage` first saw each `messageId`). + /// + /// Each `MessageSnapshot` becomes one `ChatMessage`: + /// - `.text` blocks are joined as `textSegments` and placed in + /// `contentOrder` interleaved with tool calls in `blockIndex` order. + /// - `.toolUse` blocks become entries in `toolCalls` with input, result, + /// and completion state copied verbatim from the snapshot. + /// + /// The resulting `ChatMessage.id` is a deterministic UUID derived from + /// the daemon's stable `messageId` so the same snapshot always maps to + /// the same SwiftUI identity across re-renders. + public var chatMessages: [ChatMessage] { + orderedMessages.map { snapshot in + Self.chatMessage(from: snapshot) + } + } + + /// Materialize a single `MessageSnapshot` into the legacy `ChatMessage` + /// shape consumed by the existing renderer. + static func chatMessage(from snapshot: MessageSnapshot) -> ChatMessage { + let role: ChatRole = (snapshot.role == "user") ? .user : .assistant + + var textSegments: [String] = [] + var toolCalls: [ToolCallData] = [] + var contentOrder: [ContentBlockRef] = [] + + // Walk blocks in `blockIndex` order so rendering matches the wire + // ordering. The reducer enforces idempotency, so repeated apply() + // calls converge on the same shape here. + for (_, block) in snapshot.orderedBlocks { + switch block.type { + case .text: + let segIdx = textSegments.count + textSegments.append(block.text) + contentOrder.append(.text(segIdx)) + case .toolUse: + let tcIdx = toolCalls.count + let inputDict = block.toolInput ?? [:] + let inputSummary = HistoryReconstructionService.summarizeToolInputStatic(inputDict) + let inputFull = ToolCallData.formatAllToolInput(inputDict) + let inputRawValue = HistoryReconstructionService.extractToolInputStatic(inputDict) + var toolCall = ToolCallData( + toolName: block.toolName ?? "", + inputSummary: inputSummary, + inputFull: inputFull, + inputRawValue: inputRawValue, + result: block.toolResult?.result, + isError: block.toolResult?.isError ?? false, + isComplete: block.isComplete || block.toolResult != nil, + arrivedBeforeText: textSegments.isEmpty + ) + toolCall.toolUseId = block.toolUseId + toolCall.inputRawDict = inputDict.isEmpty ? nil : inputDict + if let result = block.toolResult { + toolCall.riskLevel = result.riskLevel + toolCall.riskReason = result.riskReason + toolCall.matchedTrustRuleId = result.matchedTrustRuleId + toolCall.approvalMode = result.approvalMode + toolCall.approvalReason = result.approvalReason + toolCall.riskThreshold = result.riskThreshold + toolCall.riskScopeOptions = result.riskScopeOptions + toolCall.riskAllowlistOptions = result.riskAllowlistOptions + toolCall.riskDirectoryScopeOptions = result.riskDirectoryScopeOptions + if let containerized = result.isContainerized { toolCall.isContainerized = containerized } + } + toolCalls.append(toolCall) + contentOrder.append(.toolCall(tcIdx)) + } + } + + var message = ChatMessage( + id: Self.deterministicUUID(for: snapshot.id), + role: role, + text: "", + isStreaming: !snapshot.isComplete + ) + message.textSegments = textSegments + message.toolCalls = toolCalls + message.contentOrder = contentOrder + message.daemonMessageId = snapshot.id + return message + } + + /// Maps a daemon `messageId` (UUIDv7 string) to a stable `Foundation.UUID` + /// usable as `ChatMessage.id`. Deterministic — the same string always + /// produces the same UUID — so SwiftUI sees a stable identity across + /// re-renders. UUIDv7 ids parse directly; non-UUID ids fall back to an + /// FNV-1a hash spread across 16 bytes (collisions astronomically + /// unlikely within a single conversation). + static func deterministicUUID(for messageId: String) -> UUID { + if let parsed = UUID(uuidString: messageId) { return parsed } + var h: UInt64 = 0xcbf29ce484222325 + let prime: UInt64 = 0x100000001b3 + for b in messageId.utf8 { + h ^= UInt64(b) + h = h &* prime + } + var bytes = uuid_t(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) + withUnsafeMutableBytes(of: &bytes) { buf in + for i in 0..<16 { + h ^= h &<< 13 + h ^= h &>> 7 + h ^= h &<< 17 + buf[i] = UInt8(truncatingIfNeeded: h &>> UInt64((i % 8) * 8)) + } + } + return UUID(uuid: bytes) + } +} diff --git a/clients/shared/Features/Chat/MessageStore.swift b/clients/shared/Features/Chat/MessageStore.swift new file mode 100644 index 00000000000..0926eacc77d --- /dev/null +++ b/clients/shared/Features/Chat/MessageStore.swift @@ -0,0 +1,165 @@ +import Foundation + +/// In-memory snapshot of a single content block within an assistant message. +/// +/// A block is either a streamed text block (`type == .text`) or a tool +/// invocation block (`type == .toolUse`). Mirrors the daemon's wire-protocol +/// block model so the reducer can populate fields verbatim from incoming +/// `block_open`, `assistant_text_delta`, `tool_use_start`, `tool_input_delta`, +/// `tool_result`, and `block_close` events. +/// +/// Idempotency invariant: every mutation in `MessageStreamReducer` is gated on +/// the `seq` watermark stored in `MessageSnapshot`, so re-applying a delivered +/// event is a no-op. +public struct BlockSnapshot: Sendable { + public enum Kind: Sendable, Equatable { + case text + case toolUse + } + + public var type: Kind + /// Accumulated text content for `.text` blocks. Empty for `.toolUse` blocks. + public var text: String + /// Tool name for `.toolUse` blocks. `nil` for `.text` blocks. + public var toolName: String? + /// Stable tool-use id from the agent (correlates with confirmations and results). + public var toolUseId: String? + /// Accumulated raw input JSON streamed via `tool_input_delta`. Note this is + /// the streamed partial JSON shape — final structured input is delivered by + /// `tool_use_start` and stored in `toolInput`. + public var toolInputJson: String + /// Structured input record produced by the model (populated from `tool_use_start`). + public var toolInput: [String: AnyCodable]? + /// Final tool result payload (populated from `tool_result`). + public var toolResult: ToolResultMessage? + /// True after a matching `block_close` event has been observed. + public var isComplete: Bool + + public init( + type: Kind, + text: String = "", + toolName: String? = nil, + toolUseId: String? = nil, + toolInputJson: String = "", + toolInput: [String: AnyCodable]? = nil, + toolResult: ToolResultMessage? = nil, + isComplete: Bool = false + ) { + self.type = type + self.text = text + self.toolName = toolName + self.toolUseId = toolUseId + self.toolInputJson = toolInputJson + self.toolInput = toolInput + self.toolResult = toolResult + self.isComplete = isComplete + } +} + +/// In-memory snapshot of a single assistant message, indexed by `messageId`. +/// +/// Holds the message-level metadata declared by `message_open` plus the array +/// of content blocks declared by `block_open` / closed by `block_close`. The +/// `seqWatermarks` table tracks the highest `seq` applied per block (with +/// `-1` reserved as the message-level watermark) so the reducer can ignore +/// out-of-order or duplicate events. +public struct MessageSnapshot: Sendable { + /// Stable assistant-message id declared by `message_open`. + public let id: String + /// "assistant". + public var role: String + /// Blocks indexed by `blockIndex`. Sparse during streaming; entries are + /// created on first `block_open` for a given index. + public var blocks: [Int: BlockSnapshot] + /// True after a matching `message_close` event has been observed. + public var isComplete: Bool + /// Highest `seq` applied per `(messageId, blockIndex)`. The sentinel + /// `blockIndex == -1` records message-level events (`message_open`, + /// `message_close`). New events with `seq <= watermark` are dropped. + public var seqWatermarks: [Int: Int] + + public init( + id: String, + role: String = "assistant", + blocks: [Int: BlockSnapshot] = [:], + isComplete: Bool = false, + seqWatermarks: [Int: Int] = [:] + ) { + self.id = id + self.role = role + self.blocks = blocks + self.isComplete = isComplete + self.seqWatermarks = seqWatermarks + } + + /// Ordered view of the blocks for rendering (sorted by `blockIndex`). + public var orderedBlocks: [(index: Int, block: BlockSnapshot)] { + blocks.keys.sorted().map { ($0, blocks[$0]!) } + } +} + +/// Reactive store of `MessageSnapshot` values keyed by `messageId`. +/// +/// Designed to be the new source of truth for the chat transcript, replacing +/// the legacy `messages` array on `ChatViewModel`. As of PR 3, the store is +/// populated by `MessageStreamReducer` from the SSE event stream but is **not +/// rendered by any view** — UI continues to read from the legacy state. PR 4 +/// in the streaming-message-architecture plan flips renderers to consume this +/// store. +/// +/// Mutations happen exclusively on the main actor (so SwiftUI observers see +/// consistent snapshots), matching the existing store pattern in this +/// directory (see `ContactsStore`, `DirectoryStore`, `SettingsStore`). +@MainActor @Observable +public final class MessageStore { + + /// Snapshots keyed by `messageId`. + public var messages: [String: MessageSnapshot] = [:] + + /// Insertion order of `messageId` for stable rendering. Populated when a + /// message is first inserted via `upsertMessage`. + public var messageOrder: [String] = [] + + public init() {} + + // MARK: - Convenience accessors + + public func message(id: String) -> MessageSnapshot? { + messages[id] + } + + /// Ordered messages for rendering. Insertion order is preserved across + /// updates so streaming bubbles don't reshuffle as new blocks arrive. + public var orderedMessages: [MessageSnapshot] { + messageOrder.compactMap { messages[$0] } + } + + // MARK: - Mutation helpers + // + // Called by `MessageStreamReducer`. Public so tests can exercise the store + // directly, but production callers should always route through the reducer + // to preserve the seq-watermark idempotency invariant. + + /// Insert an empty message snapshot if one does not already exist. + /// Returns `true` if a new snapshot was inserted, `false` if the id was + /// already present (idempotent re-application). + @discardableResult + public func upsertMessage(id: String, role: String) -> Bool { + if messages[id] != nil { return false } + messages[id] = MessageSnapshot(id: id, role: role) + messageOrder.append(id) + return true + } + + public func updateMessage(id: String, mutate: (inout MessageSnapshot) -> Void) { + guard var snapshot = messages[id] else { return } + mutate(&snapshot) + messages[id] = snapshot + } + + /// Reset the store. Used when switching conversations or on logout. + public func reset() { + messages.removeAll() + messageOrder.removeAll() + } +} diff --git a/clients/shared/Features/Chat/MessageStreamReducer.swift b/clients/shared/Features/Chat/MessageStreamReducer.swift new file mode 100644 index 00000000000..dd10ca3d085 --- /dev/null +++ b/clients/shared/Features/Chat/MessageStreamReducer.swift @@ -0,0 +1,244 @@ +import Foundation + +/// Applies streaming SSE events to a `MessageStore`, producing the new +/// transcript representation introduced by the streaming-message-architecture +/// plan (see `.private/plans/streaming-message-architecture.md`). +/// +/// The reducer is **idempotent**: re-delivering an event that has already been +/// applied is a no-op. Idempotency is enforced by tracking the highest `seq` +/// applied per `(messageId, blockIndex)` — events with a `seq` less than or +/// equal to the watermark are silently dropped. Message-level events +/// (`message_open`, `message_close`) use the sentinel block index `-1`. +/// +/// As of PR 3, the reducer consumes the new event types from `EventStreamClient` +/// but **the resulting `MessageStore` is unused by any view**. The legacy +/// streaming path on `ChatViewModel` still drives the on-screen transcript. +/// PR 4 flips renderers to read from the store. +/// +/// Lifecycle: an instance is owned by `ChatViewModel` (one per chat scope). +/// Call `start()` after construction to begin consuming events; `stop()` to +/// tear down the subscription (or rely on `deinit`, which cancels the task). +@MainActor +public final class MessageStreamReducer { + /// Sentinel `blockIndex` used to record message-level events in the + /// per-message watermark table (`message_open`, `message_close`). + private static let messageLevelBlockIndex = -1 + + /// The store this reducer mutates. Owned by the caller. + public let store: MessageStore + + /// Persistent per-conversation `seq` watermark. Updated for every event + /// the reducer applies so the next SSE (re)connect can send + /// `Last-Event-Id` and skip re-applying durable-log replays. + private let lastAppliedSeqStore: LastAppliedSeqStore + + private let eventStreamClient: EventStreamClient + private var subscriptionTask: Task? + + public init( + store: MessageStore, + eventStreamClient: EventStreamClient, + lastAppliedSeqStore: LastAppliedSeqStore = .shared + ) { + self.store = store + self.eventStreamClient = eventStreamClient + self.lastAppliedSeqStore = lastAppliedSeqStore + } + + deinit { + subscriptionTask?.cancel() + } + + /// Subscribe to the chat event dispatcher and feed every relevant + /// message into `apply(event:)`. Safe to call multiple times — + /// re-subscribes after cancelling the previous task. + public func start() { + subscriptionTask?.cancel() + subscriptionTask = Task { [weak self] in + guard let self else { return } + let stream = self.eventStreamClient.subscribeChatEvents() + for await message in stream { + if Task.isCancelled { return } + self.apply(event: message) + } + } + } + + /// Cancel the active subscription. The store is left as-is. + public func stop() { + subscriptionTask?.cancel() + subscriptionTask = nil + } + + // MARK: - Apply + + /// Apply a single `ServerMessage` to the underlying `MessageStore`. + /// Events that are not part of the new streaming protocol are ignored. + public func apply(event: ServerMessage) { + switch event { + case .messageOpen(let msg): + applyMessageOpen(msg) + case .blockOpen(let msg): + applyBlockOpen(msg) + case .blockClose(let msg): + applyBlockClose(msg) + case .messageClose(let msg): + applyMessageClose(msg) + case .assistantTextDelta(let msg): + applyTextDelta(msg) + case .toolUseStart(let msg): + applyToolUseStart(msg) + case .toolInputDelta(let msg): + applyToolInputDelta(msg) + case .toolResult(let msg): + applyToolResult(msg) + default: + // All other events are out of scope for the new streaming + // architecture and intentionally ignored. + break + } + } + + // MARK: - Event handlers + + private func applyMessageOpen(_ msg: MessageOpenMessage) { + guard shouldApply(messageId: msg.messageId, blockIndex: Self.messageLevelBlockIndex, seq: msg.seq) else { return } + store.upsertMessage(id: msg.messageId, role: msg.role) + recordSeq(messageId: msg.messageId, blockIndex: Self.messageLevelBlockIndex, seq: msg.seq, conversationId: msg.conversationId) + } + + private func applyBlockOpen(_ msg: BlockOpenMessage) { + guard shouldApply(messageId: msg.messageId, blockIndex: msg.blockIndex, seq: msg.seq) else { return } + // Ensure the parent message exists. If `message_open` arrived first + // (the canonical ordering) this is a no-op; otherwise we synthesize a + // bare snapshot so the block has somewhere to live. + store.upsertMessage(id: msg.messageId, role: "assistant") + let kind: BlockSnapshot.Kind = (msg.blockType == "tool_use") ? .toolUse : .text + store.updateMessage(id: msg.messageId) { snapshot in + if snapshot.blocks[msg.blockIndex] == nil { + snapshot.blocks[msg.blockIndex] = BlockSnapshot( + type: kind, + toolName: msg.toolName, + toolUseId: msg.toolUseId + ) + } + } + recordSeq(messageId: msg.messageId, blockIndex: msg.blockIndex, seq: msg.seq, conversationId: msg.conversationId) + } + + private func applyBlockClose(_ msg: BlockCloseMessage) { + guard shouldApply(messageId: msg.messageId, blockIndex: msg.blockIndex, seq: msg.seq) else { return } + store.updateMessage(id: msg.messageId) { snapshot in + snapshot.blocks[msg.blockIndex]?.isComplete = true + } + recordSeq(messageId: msg.messageId, blockIndex: msg.blockIndex, seq: msg.seq, conversationId: msg.conversationId) + } + + private func applyMessageClose(_ msg: MessageCloseMessage) { + guard shouldApply(messageId: msg.messageId, blockIndex: Self.messageLevelBlockIndex, seq: msg.seq) else { return } + store.updateMessage(id: msg.messageId) { snapshot in + snapshot.isComplete = true + } + recordSeq(messageId: msg.messageId, blockIndex: Self.messageLevelBlockIndex, seq: msg.seq, conversationId: msg.conversationId) + } + + private func applyTextDelta(_ msg: AssistantTextDeltaMessage) { + guard let messageId = msg.messageId, let blockIndex = msg.blockIndex else { + // Synthetic / pre-anchor deltas are scoped to the legacy renderer. + return + } + guard shouldApply(messageId: messageId, blockIndex: blockIndex, seq: msg.seq) else { return } + // The block may not have been opened yet if a text delta beats its + // `block_open` event under network reordering. Lazily materialize a + // text block so the chunk isn't dropped. + store.upsertMessage(id: messageId, role: "assistant") + store.updateMessage(id: messageId) { snapshot in + if snapshot.blocks[blockIndex] == nil { + snapshot.blocks[blockIndex] = BlockSnapshot(type: .text) + } + snapshot.blocks[blockIndex]?.text.append(msg.text) + } + if let seq = msg.seq { + recordSeq(messageId: messageId, blockIndex: blockIndex, seq: seq, conversationId: msg.conversationId) + } + } + + private func applyToolUseStart(_ msg: ToolUseStartMessage) { + guard let messageId = msg.messageId, let blockIndex = msg.blockIndex else { return } + guard shouldApply(messageId: messageId, blockIndex: blockIndex, seq: msg.seq) else { return } + store.upsertMessage(id: messageId, role: "assistant") + store.updateMessage(id: messageId) { snapshot in + if snapshot.blocks[blockIndex] == nil { + snapshot.blocks[blockIndex] = BlockSnapshot(type: .toolUse) + } + snapshot.blocks[blockIndex]?.type = .toolUse + snapshot.blocks[blockIndex]?.toolName = msg.toolName + snapshot.blocks[blockIndex]?.toolUseId = msg.toolUseId + snapshot.blocks[blockIndex]?.toolInput = msg.input + } + if let seq = msg.seq { + recordSeq(messageId: messageId, blockIndex: blockIndex, seq: seq, conversationId: msg.conversationId) + } + } + + private func applyToolInputDelta(_ msg: ToolInputDeltaMessage) { + guard let messageId = msg.messageId, let blockIndex = msg.blockIndex else { return } + guard shouldApply(messageId: messageId, blockIndex: blockIndex, seq: msg.seq) else { return } + store.upsertMessage(id: messageId, role: "assistant") + store.updateMessage(id: messageId) { snapshot in + if snapshot.blocks[blockIndex] == nil { + snapshot.blocks[blockIndex] = BlockSnapshot(type: .toolUse, toolName: msg.toolName, toolUseId: msg.toolUseId) + } + snapshot.blocks[blockIndex]?.toolInputJson.append(msg.content) + } + if let seq = msg.seq { + recordSeq(messageId: messageId, blockIndex: blockIndex, seq: seq, conversationId: msg.conversationId) + } + } + + private func applyToolResult(_ msg: ToolResultMessage) { + guard let messageId = msg.messageId, let blockIndex = msg.blockIndex else { return } + guard shouldApply(messageId: messageId, blockIndex: blockIndex, seq: msg.seq) else { return } + store.upsertMessage(id: messageId, role: "assistant") + store.updateMessage(id: messageId) { snapshot in + if snapshot.blocks[blockIndex] == nil { + snapshot.blocks[blockIndex] = BlockSnapshot(type: .toolUse, toolName: msg.toolName, toolUseId: msg.toolUseId) + } + snapshot.blocks[blockIndex]?.toolResult = msg + } + if let seq = msg.seq { + recordSeq(messageId: messageId, blockIndex: blockIndex, seq: seq, conversationId: msg.conversationId) + } + } + + // MARK: - Idempotency + + /// Returns `true` when the event should be applied. The contract: + /// - If `seq` is `nil` (legacy daemon pre-PR-1), apply unconditionally — + /// we cannot deduplicate without a sequence number. The legacy renderer + /// already tolerates duplicates. + /// - Otherwise, drop events whose `seq` is `<=` the recorded watermark. + private func shouldApply(messageId: String, blockIndex: Int, seq: Int?) -> Bool { + guard let seq else { return true } + if let watermark = store.messages[messageId]?.seqWatermarks[blockIndex], + seq <= watermark { + return false + } + return true + } + + private func recordSeq(messageId: String, blockIndex: Int, seq: Int, conversationId: String?) { + store.updateMessage(id: messageId) { snapshot in + if let existing = snapshot.seqWatermarks[blockIndex], existing >= seq { + return + } + snapshot.seqWatermarks[blockIndex] = seq + } + // Persist the per-conversation watermark so the next SSE (re)connect + // can send `Last-Event-Id` and skip durable-log replays the client + // has already applied (see `LastAppliedSeqStore`). + if let conversationId, !conversationId.isEmpty { + lastAppliedSeqStore.setSeq(seq, forConversation: conversationId) + } + } +} diff --git a/clients/shared/Features/Contacts/ContactsStore.swift b/clients/shared/Features/Contacts/ContactsStore.swift index dacf5197829..c48fc4ee69c 100644 --- a/clients/shared/Features/Contacts/ContactsStore.swift +++ b/clients/shared/Features/Contacts/ContactsStore.swift @@ -125,7 +125,7 @@ public final class ContactsStore { subscriptionTask?.cancel() subscriptionTask = Task { [weak self] in guard let self else { return } - let stream = self.eventStreamClient.subscribe() + let stream = self.eventStreamClient.subscribeContactsEvents() for await message in stream { guard !Task.isCancelled else { return } diff --git a/clients/shared/Features/Directory/DirectoryStore.swift b/clients/shared/Features/Directory/DirectoryStore.swift index b4fec09a315..47f3e8b3c04 100644 --- a/clients/shared/Features/Directory/DirectoryStore.swift +++ b/clients/shared/Features/Directory/DirectoryStore.swift @@ -191,7 +191,7 @@ public final class DirectoryStore: ObservableObject { appFilesChangedTask?.cancel() appFilesChangedTask = Task { [weak self] in guard let eventStreamClient = self?.eventStreamClient else { return } - let stream = eventStreamClient.subscribe() + let stream = eventStreamClient.subscribeAppFilesEvents() for await message in stream { guard let self, !Task.isCancelled else { return } diff --git a/clients/shared/Network/EventStreamClient.swift b/clients/shared/Network/EventStreamClient.swift index 9cf83ebe2d3..17e66ede5d1 100644 --- a/clients/shared/Network/EventStreamClient.swift +++ b/clients/shared/Network/EventStreamClient.swift @@ -79,48 +79,167 @@ private final class SSEHandshakeCaptureDelegate: NSObject, URLSessionDataDelegat } } -/// Client that manages an SSE connection to the assistant runtime and broadcasts -/// parsed `ServerMessage` values to multiple independent subscribers. +/// Routing category for parsed `ServerMessage` events. Each consumer subscribes +/// to exactly one category via the typed `subscribeXxxEvents()` entry points; +/// `EventStreamClient` parses the SSE stream once and dispatches each event to +/// the consumers in the categories that care about its type. +/// +/// Replaces the legacy single-broadcaster `subscribe(filter:)` model that +/// delivered every event to every subscriber. The legacy model caused two +/// concrete problems this enum addresses: +/// +/// 1. **Double-subscriber window on loop restart.** `ChatViewModel` used to +/// tear down and re-create its subscription every turn (`startMessageLoop`); +/// if a streaming event landed during that window, it could be applied twice +/// by two overlapping subscribers. Per-domain dispatchers let each consumer +/// subscribe once at init for the lifetime of its owner — no restart, no +/// overlap. +/// 2. **Unbounded fan-out per subscriber.** The legacy `AsyncStream` per +/// subscriber used the default unbounded buffer. Each typed dispatcher now +/// runs with `.bufferingNewest(eventStreamSubscriberBufferLimit)` so a slow +/// consumer cannot pin arbitrary memory. +public enum EventStreamCategory: Sendable { + /// Streaming chat events consumed by `ChatViewModel`'s action handler and + /// `MessageStreamReducer` — message lifecycle, blocks, deltas, tool use, + /// queue / dequeue, generation handoff, errors, surfaces (which inline- + /// render inside chat), confirmations, subagent events, and assistant + /// status. The set is intentionally broad because chat is the primary + /// consumer of most server events. + case chat + /// Conversation list management — `conversation_list_response`, + /// `history_response`, `conversation_title_updated`, + /// `conversation_list_invalidated`. Consumed by `ConversationRestorer`. + case conversationList + /// Conversation orchestration — `conversation_id_resolved`, + /// `conversation_inference_profile_updated`, ACP session lifecycle. + /// Consumed by `ConversationManager`. + case conversationOrchestration + /// Trace + usage telemetry consumed by `MainWindow`. + case trace + /// Disk pressure status + feature-flag changes consumed by + /// `DiskPressureStatusStore`. + case diskPressure + /// `contacts_changed` invalidations consumed by `ContactsStore`. + case contacts + /// `app_files_changed` invalidations consumed by `DirectoryStore`. + case appFiles + /// Settings store events — currently `ingress_config_response` and + /// `telegram_config_response`. + case settings + /// Home tab events — `relationship_state_updated`, `home_feed_updated`. + case home + /// Meet status events — `meet_*`. + case meet + /// App-delegate level cross-domain orchestration: notifications, + /// open_url, open_conversation, document editor, recording control, + /// identity / avatar / sounds / config / feature flags, host tool + /// requests + cancels, signing identity, sync invalidation, surface + /// dismiss/complete for the surface manager, conversation errors, etc. + case appDelegate +} + +/// Bound for each typed dispatcher's per-subscriber buffer. The SSE loop runs +/// on @MainActor and consumers are also @MainActor, so under normal load this +/// is empty most of the time. The bound exists to cap memory if a consumer +/// hangs — newer events take precedence over older buffered ones because the +/// streaming reducer is idempotent and a stale event has no value once a newer +/// one of the same kind has been seen. +private let eventStreamSubscriberBufferLimit = 256 + +/// Client that manages an SSE connection to the assistant runtime and routes +/// parsed `ServerMessage` values to typed per-domain dispatchers. /// /// Backed by `GatewayHTTPClient.stream()` for authenticated SSE connections. @MainActor public final class EventStreamClient { - // MARK: - Broadcast Subscribers - - /// Mutable filter that a subscriber can update as its conversation changes. - /// Passed by reference so callers can set `conversationId` after subscribing - /// (e.g. when `conversationInfo` arrives and assigns the conversation ID). - public final class ConversationFilter: @unchecked Sendable { - public var conversationId: String? - public init(conversationId: String? = nil) { self.conversationId = conversationId } - } + // MARK: - Typed Dispatchers private struct Subscription { let continuation: AsyncStream.Continuation - let filter: ConversationFilter? } - private var subscribers: [UUID: Subscription] = [:] + /// Subscribers grouped by category. Each category has its own subscriber + /// list so a `chat` consumer's slowness can't backpressure a + /// `conversationList` consumer (they share no buffer). + private var subscribers: [EventStreamCategory: [UUID: Subscription]] = [:] - /// Creates a new message stream for the caller. - /// - /// - Parameter filter: Optional conversation filter. When provided, - /// messages whose `conversationId` doesn't match are not delivered, - /// reducing unnecessary subscriber wakeups. Messages with no - /// `conversationId` (system-level) are always delivered. - public func subscribe(filter: ConversationFilter? = nil) -> AsyncStream { + private func makeSubscription(for category: EventStreamCategory) -> AsyncStream { let id = UUID() - let (stream, continuation) = AsyncStream.makeStream() - subscribers[id] = Subscription(continuation: continuation, filter: filter) + let (stream, continuation) = AsyncStream.makeStream( + bufferingPolicy: .bufferingNewest(eventStreamSubscriberBufferLimit) + ) + var bucket = subscribers[category] ?? [:] + bucket[id] = Subscription(continuation: continuation) + subscribers[category] = bucket continuation.onTermination = { [weak self] _ in Task { @MainActor [weak self] in - self?.subscribers.removeValue(forKey: id) + guard let self else { return } + var bucket = self.subscribers[category] ?? [:] + bucket.removeValue(forKey: id) + self.subscribers[category] = bucket } } return stream } + // MARK: - Typed Subscribe Entry Points + + /// Subscribe to streaming chat events. See ``EventStreamCategory/chat``. + public func subscribeChatEvents() -> AsyncStream { + makeSubscription(for: .chat) + } + + /// Subscribe to conversation list management events. + public func subscribeConversationListEvents() -> AsyncStream { + makeSubscription(for: .conversationList) + } + + /// Subscribe to conversation orchestration events. + public func subscribeConversationOrchestrationEvents() -> AsyncStream { + makeSubscription(for: .conversationOrchestration) + } + + /// Subscribe to trace + usage telemetry events. + public func subscribeTraceEvents() -> AsyncStream { + makeSubscription(for: .trace) + } + + /// Subscribe to disk pressure + feature flag events. + public func subscribeDiskPressureEvents() -> AsyncStream { + makeSubscription(for: .diskPressure) + } + + /// Subscribe to contacts invalidation events. + public func subscribeContactsEvents() -> AsyncStream { + makeSubscription(for: .contacts) + } + + /// Subscribe to app-file change invalidations. + public func subscribeAppFilesEvents() -> AsyncStream { + makeSubscription(for: .appFiles) + } + + /// Subscribe to settings store events. + public func subscribeSettingsEvents() -> AsyncStream { + makeSubscription(for: .settings) + } + + /// Subscribe to home / feed events. + public func subscribeHomeEvents() -> AsyncStream { + makeSubscription(for: .home) + } + + /// Subscribe to meet status events. + public func subscribeMeetEvents() -> AsyncStream { + makeSubscription(for: .meet) + } + + /// Subscribe to the app-delegate orchestration stream. + public func subscribeAppDelegateEvents() -> AsyncStream { + makeSubscription(for: .appDelegate) + } + // MARK: - SSE State private var sseTask: Task? @@ -168,6 +287,15 @@ public final class EventStreamClient { /// Called when a token_rotated event is received. var onTokenRefreshed: ((String) -> Void)? + /// Resolves the `Last-Event-Id` value to send on the next SSE connect / + /// reconnect. Returning `nil` skips the header. Set by `ChatViewModel` + /// from the persisted `LastAppliedSeqStore` watermark for the currently + /// active conversation. The daemon only honors the header when the + /// stream is conversation-scoped — today's global `/v1/events` stream + /// ignores it, but the wiring is in place for PR 5's per-conversation + /// subscription model. + public var lastEventIdProvider: (() -> String?)? + // MARK: - Init @@ -216,8 +344,10 @@ public final class EventStreamClient { func teardown() { shouldReconnect = false stopSSE() - for subscriber in subscribers.values { - subscriber.continuation.finish() + for bucket in subscribers.values { + for subscriber in bucket.values { + subscriber.continuation.finish() + } } subscribers.removeAll() } @@ -402,10 +532,12 @@ public final class EventStreamClient { do { await self.sseHandshakeDiagnostics.reset() + let lastEventId = self.lastEventIdProvider?() let (bytes, response) = try await GatewayHTTPClient.stream( path: "events", timeout: .infinity, - session: session + session: session, + lastEventId: lastEventId ) guard let http = response as? HTTPURLResponse, http.statusCode == 200 else { @@ -517,7 +649,6 @@ public final class EventStreamClient { // scheduled conversations — which never emit user_message_echo — // can't pollute the map with their conversationId during the window // between sendUserMessage and the HTTP 202 response. - var broadcastConversationId: String? if let conversationId = extractJsonStringValue(from: jsonString, key: "conversationId") { let eventType = extractJsonStringValue(from: jsonString, key: "type") let localId: String? @@ -534,7 +665,6 @@ public final class EventStreamClient { } else { localId = nil } - broadcastConversationId = localId ?? conversationId if let localId { jsonString = jsonString.replacingOccurrences( of: "\"conversationId\":\"\(conversationId)\"", @@ -607,7 +737,7 @@ public final class EventStreamClient { guard let message else { return } if shouldIgnoreHostToolRequest(message) { return } - handleParsedMessage(message, conversationId: broadcastConversationId) + handleParsedMessage(message) } private func shouldIgnoreHostToolRequest(_ message: ServerMessage) -> Bool { @@ -652,8 +782,8 @@ public final class EventStreamClient { /// Handle a successfully parsed server message: /// 1. Intercept token_rotated (update credentials, reconnect SSE) /// 2. Call pre-processor (DaemonStatus state updates) - /// 3. Broadcast to all subscribers - private func handleParsedMessage(_ message: ServerMessage, conversationId: String? = nil) { + /// 3. Route to typed dispatchers + private func handleParsedMessage(_ message: ServerMessage) { // Intercept token rotation — don't broadcast to subscribers if case .tokenRotated(let msg) = message { log.info("Received token_rotated event — reconnecting SSE") @@ -676,19 +806,144 @@ public final class EventStreamClient { } messagePreProcessor?(message) - broadcastMessage(message, conversationId: conversationId) + broadcastMessage(message) } - /// Broadcast a message to subscribers. When `conversationId` is provided, - /// subscribers with a non-matching conversation filter are skipped. - public func broadcastMessage(_ message: ServerMessage, conversationId: String? = nil) { - for subscriber in subscribers.values { - if let filterConvId = subscriber.filter?.conversationId, - let messageConvId = conversationId, - filterConvId != messageConvId { - continue + /// Dispatch a parsed event to every typed subscriber whose category + /// covers the event's kind. Public so tests and synthetic-event call + /// sites (e.g. `userMessagePersisted`) can fan a message into the + /// routing layer without going through the SSE parser. + public func broadcastMessage(_ message: ServerMessage) { + let categories = Self.categories(for: message) + for category in categories { + guard let bucket = subscribers[category] else { continue } + for subscriber in bucket.values { + subscriber.continuation.yield(message) } - subscriber.continuation.yield(message) + } + } + + /// Static category mapping. Each event type is routed to one or more + /// categories — for example, `appFilesChanged` goes to both `appFiles` + /// (DirectoryStore invalidation) and `chat` (ChatActionHandler's + /// surface-image refresh path). + /// + /// The mapping is deliberately conservative: when a category cares about + /// a class of events (e.g. chat events) we include the full set so a new + /// streaming-related event landing in this file doesn't silently bypass + /// the consumer that needs it. Cross-domain orchestration events (host + /// tools, document editor, recording, etc.) all flow through + /// `appDelegate`. + private static func categories(for message: ServerMessage) -> [EventStreamCategory] { + switch message { + // MARK: Chat streaming + lifecycle + case .messageOpen, .blockOpen, .blockClose, .messageClose, + .assistantTextDelta, .assistantThinkingDelta, .assistantActivityState, + .toolUseStart, .toolUsePreviewStart, .toolInputDelta, .toolOutputChunk, .toolResult, + .messageComplete, .messageQueued, .messageDequeued, .messageRequestComplete, + .messageQueuedDeleted, .messageSteered, .generationCancelled, .generationHandoff, + .userMessageEcho, .userMessagePersisted, .queuedMessageAcked, + .conversationInfo, .conversationError, .confirmationRequest, .confirmationStateChanged, + .undoComplete, .suggestionResponse, .error, + .watchStarted, .watchCompleteRequest, + .subagentSpawned, .subagentStatusChanged, .subagentEvent, + .contextCompacted, .compactionCircuitOpen, .compactionCircuitClosed, + .memoryStatus, .memoryRecalled, + .modelInfo, .guardianActionsPendingResponse, .turnProfileAutoRouted, + .usageProgress: + return [.chat] + + // MARK: UI surfaces — chat (inline) + app delegate (overlay surface manager) + case .uiSurfaceShow, .uiSurfaceUpdate, .uiSurfaceUndoResult: + return [.chat, .appDelegate] + case .uiSurfaceDismiss, .uiSurfaceComplete: + return [.chat, .appDelegate] + case .uiLayoutConfig: + return [.appDelegate] + + // MARK: Conversation list management + case .conversationListResponse, .historyResponse, + .conversationTitleUpdated, .conversationListInvalidated: + return [.conversationList] + + // MARK: Conversation orchestration + case .conversationIdResolved, .conversationInferenceProfileUpdated, + .acpSessionSpawned, .acpSessionUpdate, .acpSessionCompleted, .acpSessionError: + return [.conversationOrchestration] + + // MARK: Trace + usage telemetry + case .traceEvent: + return [.trace] + case .usageUpdate: + // Chat consumes for in-conversation usage attribution; the main + // window's trace category resets the dashboard. + return [.chat, .trace] + + // MARK: Disk pressure + case .diskPressureStatusChanged: + return [.diskPressure] + + // MARK: Feature flag changes — disk pressure (UI-impacting) + + // app delegate (kicks off reload). + case .featureFlagsChanged: + return [.diskPressure, .appDelegate] + + // MARK: Contacts invalidation + case .contactsChanged: + return [.contacts] + + // MARK: Workspace app files + case .appFilesChanged: + // DirectoryStore invalidates its app list; chat's action handler + // refreshes surface preview images for the active conversation; + // app delegate refreshes the in-memory apps cache. + return [.appFiles, .chat, .appDelegate] + + // MARK: Settings store + case .ingressConfigResponse, .telegramConfigResponse: + return [.settings] + + // MARK: Home / feed + case .relationshipStateUpdated, .homeFeedUpdated: + return [.home] + + // MARK: Meet status + case .meetJoining, .meetJoined, .meetLeft, .meetError, + .meetParticipantChanged, .meetSpeakerChanged, .meetTranscriptChunk, + .meetChatSent, .meetSpeakingStarted, .meetSpeakingEnded: + return [.meet] + + // MARK: App-delegate orchestration + case .notificationIntent, .notificationConversationCreated, + .openUrl, .openConversation, .navigateSettings, + .showPlatformLogin, .platformDisconnected, + .taskRunConversationCreated, .scheduleConversationCreated, + .heartbeatConversationCreated, + .documentEditorShow, .documentEditorUpdate, + .documentSaveResponse, .documentLoadResponse, + .recordingStart, .recordingStop, .recordingPause, .recordingResume, + .clientSettingsUpdate, .identityChanged, .avatarUpdated, + .soundsConfigUpdated, .configChanged, + .syncChanged, + .bookmarkCreated, .bookmarkDeleted, + .hostBashRequest, .hostBashCancel, + .hostFileRequest, .hostFileCancel, + .hostCuRequest, .hostCuCancel, + .hostAppControlRequest, .hostAppControlCancel, + .hostBrowserRequest, .hostBrowserCancel, + .hostTransferRequest, .hostTransferCancel, + .signBundlePayload, .getSigningIdentity, + .secretRequest, .contactRequest, + .skillStateChanged, + .serviceGroupUpdateStarting, .serviceGroupUpdateProgress, .serviceGroupUpdateComplete: + return [.appDelegate] + + // MARK: Unrouted — pre-handled by the parsed-message interceptor + // (`tokenRotated`) or pure on-demand request/response payloads that + // are consumed directly by their HTTP/IPC initiators rather than + // through SSE fan-out. + default: + return [] } } @@ -727,8 +982,10 @@ public final class EventStreamClient { tokenRotationTask?.cancel() sseReconnectTask?.cancel() sseTask?.cancel() - for subscriber in subscribers.values { - subscriber.continuation.finish() + for bucket in subscribers.values { + for subscriber in bucket.values { + subscriber.continuation.finish() + } } } } diff --git a/clients/shared/Network/GatewayHTTPClient.swift b/clients/shared/Network/GatewayHTTPClient.swift index ea23e4ad80a..4c6a1288e45 100644 --- a/clients/shared/Network/GatewayHTTPClient.swift +++ b/clients/shared/Network/GatewayHTTPClient.swift @@ -489,13 +489,25 @@ public enum GatewayHTTPClient { /// in `AsyncBytes`). /// - Returns: A tuple of `(URLSession.AsyncBytes, URLResponse)` for streaming consumption. /// - Throws: `ClientError` if the request cannot be constructed, or network errors from `URLSession`. - public static func stream(path: String, timeout: TimeInterval = 30, session: URLSession = .shared) async throws -> (URLSession.AsyncBytes, URLResponse) { + public static func stream( + path: String, + timeout: TimeInterval = 30, + session: URLSession = .shared, + lastEventId: String? = nil + ) async throws -> (URLSession.AsyncBytes, URLResponse) { let connection = try resolveConnection() var request = try buildRequest(path: path, params: nil, method: "GET", timeout: timeout, connection: connection) request.setValue(sseAcceptHeader, forHTTPHeaderField: "Accept") request.setValue(DeviceIdStore.getOrCreate(), forHTTPHeaderField: "X-Vellum-Client-Id") request.setValue(clientInterfaceId, forHTTPHeaderField: "X-Vellum-Interface-Id") request.setValue(ProcessInfo.processInfo.hostName, forHTTPHeaderField: "X-Vellum-Machine-Name") + // Standard SSE reconnect field — the daemon's `/v1/events` route + // honors it when the stream is conversation-scoped, replaying + // persisted events with `seq > Last-Event-Id` from the durable log + // before delivering live events. Ignored on global subscriptions. + if let lastEventId, !lastEventId.isEmpty { + request.setValue(lastEventId, forHTTPHeaderField: "Last-Event-Id") + } logOutgoing(request, quiet: false) let (bytes, response) = try await session.bytes(for: request) if let http = response as? HTTPURLResponse { diff --git a/clients/shared/Network/Generated/GeneratedAPITypes.swift b/clients/shared/Network/Generated/GeneratedAPITypes.swift index 597311c8d05..5acd57d8123 100644 --- a/clients/shared/Network/Generated/GeneratedAPITypes.swift +++ b/clients/shared/Network/Generated/GeneratedAPITypes.swift @@ -460,11 +460,26 @@ public struct AssistantTextDelta: Codable, Sendable { public let type: String public let text: String public let conversationId: String? + /// Database row id of the assistant message this delta belongs to. Stamped + /// from the pre-allocated turn anchor (see `AssistantTurnStartEvent`). + /// Absent on streams produced by older daemons that pre-date the anchor + /// protocol or on synthetic deltas (canned greetings, slash-command echoes). + public let messageId: String? + /// 0-based content-block index within the parent `messageId`. Optional for + /// backwards compatibility with synthetic deltas that don't bind to a block. + public let blockIndex: Int? + /// Monotonically increasing per-conversation sequence number for idempotent + /// client replay. Optional during the streaming-architecture rollout — + /// daemons that pre-date the protocol omit it. + public let seq: Int? - public init(type: String, text: String, conversationId: String? = nil) { + public init(type: String, text: String, conversationId: String? = nil, messageId: String? = nil, blockIndex: Int? = nil, seq: Int? = nil) { self.type = type self.text = text self.conversationId = conversationId + self.messageId = messageId + self.blockIndex = blockIndex + self.seq = seq } } @@ -514,6 +529,63 @@ public struct AvatarUpdated: Codable, Sendable { } } +/// `block_open` SSE event. Declares the start of a new content block within an +/// assistant message. Paired with `BlockClose` at the block's end. +/// +/// Block kinds: +/// - `text` — a streamed text block opened on the first text delta +/// emitted after the previous block closed. +/// - `tool_use` — a tool invocation; opened immediately before the matching +/// `ToolUseStart` and closed when the corresponding `ToolResult` +/// arrives. +/// +/// `blockIndex` is 0-based and monotonically increases within a single message. +public struct BlockOpenEvent: Codable, Sendable { + public let type: String + public let messageId: String + public let blockIndex: Int + /// "text" or "tool_use". + public let blockType: String + /// Tool name when `blockType == "tool_use"`; absent otherwise. + public let toolName: String? + /// Tool-use id when `blockType == "tool_use"`; absent otherwise. + public let toolUseId: String? + /// Monotonically increasing per-conversation sequence number. + public let seq: Int + public let conversationId: String? + + public init(type: String = "block_open", messageId: String, blockIndex: Int, blockType: String, toolName: String? = nil, toolUseId: String? = nil, seq: Int, conversationId: String? = nil) { + self.type = type + self.messageId = messageId + self.blockIndex = blockIndex + self.blockType = blockType + self.toolName = toolName + self.toolUseId = toolUseId + self.seq = seq + self.conversationId = conversationId + } +} + +/// `block_close` SSE event. Peer of `BlockOpen`. Text blocks close when the +/// next non-text content starts (or when the turn ends); tool_use blocks close +/// when their matching `ToolResult` arrives. +public struct BlockCloseEvent: Codable, Sendable { + public let type: String + public let messageId: String + public let blockIndex: Int + /// Monotonically increasing per-conversation sequence number. + public let seq: Int + public let conversationId: String? + + public init(type: String = "block_close", messageId: String, blockIndex: Int, seq: Int, conversationId: String? = nil) { + self.type = type + self.messageId = messageId + self.blockIndex = blockIndex + self.seq = seq + self.conversationId = conversationId + } +} + public struct BundleAppRequest: Codable, Sendable { public let type: String public let appId: String @@ -2760,6 +2832,46 @@ public struct MessageComplete: Codable, Sendable { } } +/// `message_open` SSE event. Declares a stable `messageId` (UUIDv7) for an +/// assistant message at the start of a turn, before the first content event. +/// Paired with `MessageClose` at end-of-turn. Clients should anchor a message +/// bubble at `MessageOpen` instead of inferring identity from the first delta. +public struct MessageOpenEvent: Codable, Sendable { + public let type: String + public let messageId: String + /// "assistant". + public let role: String + /// Monotonically increasing per-conversation sequence number. + public let seq: Int + public let conversationId: String? + + public init(type: String = "message_open", messageId: String, role: String, seq: Int, conversationId: String? = nil) { + self.type = type + self.messageId = messageId + self.role = role + self.seq = seq + self.conversationId = conversationId + } +} + +/// `message_close` SSE event. Peer of `MessageOpen`. Marks the assistant turn +/// done in the new streaming architecture; the legacy `MessageComplete` event +/// continues to fire alongside it during the rollout for backward compatibility. +public struct MessageCloseEvent: Codable, Sendable { + public let type: String + public let messageId: String + /// Monotonically increasing per-conversation sequence number. + public let seq: Int + public let conversationId: String? + + public init(type: String = "message_close", messageId: String, seq: Int, conversationId: String? = nil) { + self.type = type + self.messageId = messageId + self.seq = seq + self.conversationId = conversationId + } +} + public struct MessageContentRequest: Codable, Sendable { public let type: String public let conversationId: String @@ -4809,13 +4921,23 @@ public struct ToolInputDelta: Codable, Sendable { public let conversationId: String? /// The tool_use block ID for client-side correlation. public let toolUseId: String? + /// Database row id of the assistant message that owns this tool_use block. + /// Same semantics as `AssistantTextDelta.messageId`. + public let messageId: String? + /// 0-based content-block index within the parent `messageId`. + public let blockIndex: Int? + /// Monotonically increasing per-conversation sequence number. + public let seq: Int? - public init(type: String, toolName: String, content: String, conversationId: String? = nil, toolUseId: String? = nil) { + public init(type: String, toolName: String, content: String, conversationId: String? = nil, toolUseId: String? = nil, messageId: String? = nil, blockIndex: Int? = nil, seq: Int? = nil) { self.type = type self.toolName = toolName self.content = content self.conversationId = conversationId self.toolUseId = toolUseId + self.messageId = messageId + self.blockIndex = blockIndex + self.seq = seq } } @@ -4997,8 +5119,15 @@ public struct ToolResult: Codable, Sendable { /// trust rule from the chip-ladder UI. public let riskAllowlistOptions: [ConfirmationRequestAllowlistOption]? public let riskDirectoryScopeOptions: [ConfirmationRequestDirectoryScopeOption]? + /// Database row id of the assistant message that owns the parent tool_use + /// block. Same semantics as `AssistantTextDelta.messageId`. + public let messageId: String? + /// 0-based content-block index within the parent `messageId`. + public let blockIndex: Int? + /// Monotonically increasing per-conversation sequence number. + public let seq: Int? - public init(type: String, toolName: String, result: String, isError: Bool? = nil, diff: ToolResultDiff? = nil, status: String? = nil, conversationId: String? = nil, imageDataList: [String]? = nil, toolUseId: String? = nil, riskLevel: String? = nil, riskReason: String? = nil, matchedTrustRuleId: String? = nil, approvalMode: String? = nil, approvalReason: String? = nil, riskThreshold: String? = nil, isContainerized: Bool? = nil, riskScopeOptions: [ToolResultRiskScopeOption]? = nil, riskAllowlistOptions: [ConfirmationRequestAllowlistOption]? = nil, riskDirectoryScopeOptions: [ConfirmationRequestDirectoryScopeOption]? = nil) { + public init(type: String, toolName: String, result: String, isError: Bool? = nil, diff: ToolResultDiff? = nil, status: String? = nil, conversationId: String? = nil, imageDataList: [String]? = nil, toolUseId: String? = nil, riskLevel: String? = nil, riskReason: String? = nil, matchedTrustRuleId: String? = nil, approvalMode: String? = nil, approvalReason: String? = nil, riskThreshold: String? = nil, isContainerized: Bool? = nil, riskScopeOptions: [ToolResultRiskScopeOption]? = nil, riskAllowlistOptions: [ConfirmationRequestAllowlistOption]? = nil, riskDirectoryScopeOptions: [ConfirmationRequestDirectoryScopeOption]? = nil, messageId: String? = nil, blockIndex: Int? = nil, seq: Int? = nil) { self.type = type self.toolName = toolName self.result = result @@ -5018,6 +5147,9 @@ public struct ToolResult: Codable, Sendable { self.riskScopeOptions = riskScopeOptions self.riskAllowlistOptions = riskAllowlistOptions self.riskDirectoryScopeOptions = riskDirectoryScopeOptions + self.messageId = messageId + self.blockIndex = blockIndex + self.seq = seq } } @@ -5065,13 +5197,23 @@ public struct ToolUseStart: Codable, Sendable { public let conversationId: String? /// The tool_use block ID for client-side correlation. public let toolUseId: String? + /// Database row id of the assistant message that owns this tool_use block. + /// Same semantics as `AssistantTextDelta.messageId`. + public let messageId: String? + /// 0-based content-block index within the parent `messageId`. + public let blockIndex: Int? + /// Monotonically increasing per-conversation sequence number. + public let seq: Int? - public init(type: String, toolName: String, input: [String: AnyCodable], conversationId: String? = nil, toolUseId: String? = nil) { + public init(type: String, toolName: String, input: [String: AnyCodable], conversationId: String? = nil, toolUseId: String? = nil, messageId: String? = nil, blockIndex: Int? = nil, seq: Int? = nil) { self.type = type self.toolName = toolName self.input = input self.conversationId = conversationId self.toolUseId = toolUseId + self.messageId = messageId + self.blockIndex = blockIndex + self.seq = seq } } diff --git a/clients/shared/Network/MessageTypes.swift b/clients/shared/Network/MessageTypes.swift index ea4125c2463..8792e17eb27 100644 --- a/clients/shared/Network/MessageTypes.swift +++ b/clients/shared/Network/MessageTypes.swift @@ -687,6 +687,26 @@ extension AssistantTextDelta { } } +/// Declares the start of an assistant message — carries a stable `messageId` +/// that the block-scoped events in this turn stamp on. Paired with +/// `MessageCloseMessage` at end-of-turn. +/// Backed by generated `MessageOpenEvent`. +public typealias MessageOpenMessage = MessageOpenEvent + +/// Declares the start of a content block within an assistant message. +/// Paired with `BlockCloseMessage`. +/// Backed by generated `BlockOpenEvent`. +public typealias BlockOpenMessage = BlockOpenEvent + +/// Declares the end of a content block within an assistant message. +/// Peer of `BlockOpenMessage`. +/// Backed by generated `BlockCloseEvent`. +public typealias BlockCloseMessage = BlockCloseEvent + +/// Declares the end of an assistant turn. Peer of `MessageOpenMessage`. +/// Backed by generated `MessageCloseEvent`. +public typealias MessageCloseMessage = MessageCloseEvent + /// Streamed thinking delta from the assistant's reasoning. public typealias AssistantThinkingDeltaMessage = AssistantThinkingDelta @@ -2914,6 +2934,10 @@ public enum ServerMessage: Decodable, Sendable { case assistantTextDelta(AssistantTextDeltaMessage) case assistantActivityState(AssistantActivityStateMessage) case assistantThinkingDelta(AssistantThinkingDeltaMessage) + case messageOpen(MessageOpenMessage) + case blockOpen(BlockOpenMessage) + case blockClose(BlockCloseMessage) + case messageClose(MessageCloseMessage) case messageComplete(MessageCompleteMessage) case conversationInfo(ConversationInfoMessage) case conversationInferenceProfileUpdated(ConversationInferenceProfileUpdatedMessage) @@ -3135,6 +3159,18 @@ public enum ServerMessage: Decodable, Sendable { case "assistant_thinking_delta": let message = try AssistantThinkingDeltaMessage(from: decoder) self = .assistantThinkingDelta(message) + case "message_open": + let message = try MessageOpenMessage(from: decoder) + self = .messageOpen(message) + case "block_open": + let message = try BlockOpenMessage(from: decoder) + self = .blockOpen(message) + case "block_close": + let message = try BlockCloseMessage(from: decoder) + self = .blockClose(message) + case "message_close": + let message = try MessageCloseMessage(from: decoder) + self = .messageClose(message) case "message_complete": let message = try MessageCompleteMessage(from: decoder) self = .messageComplete(message) diff --git a/clients/shared/Tests/GatewayConnectionManagerTests.swift b/clients/shared/Tests/GatewayConnectionManagerTests.swift index 99c842be48d..f2a7764f86f 100644 --- a/clients/shared/Tests/GatewayConnectionManagerTests.swift +++ b/clients/shared/Tests/GatewayConnectionManagerTests.swift @@ -40,14 +40,15 @@ final class GatewayConnectionManagerTests: XCTestCase { func testSubscribeReturnsStream() { let client = GatewayConnectionManager() - let stream = client.eventStreamClient.subscribe() - // Simply verify subscribe() returns without crashing; stream is non-nil (value type) + let stream = client.eventStreamClient.subscribeChatEvents() + // Simply verify the typed dispatcher returns without crashing; stream + // is non-nil (value type). _ = stream } func testEmitDeliversToSubscriber() async { let client = GatewayConnectionManager() - let stream = client.eventStreamClient.subscribe() + let stream = client.eventStreamClient.subscribeChatEvents() // Collect one message from the stream let expectation = XCTestExpectation(description: "Subscriber receives emitted message") @@ -81,8 +82,8 @@ final class GatewayConnectionManagerTests: XCTestCase { func testEmitDeliversToMultipleSubscribers() async { let client = GatewayConnectionManager() - let stream1 = client.eventStreamClient.subscribe() - let stream2 = client.eventStreamClient.subscribe() + let stream1 = client.eventStreamClient.subscribeChatEvents() + let stream2 = client.eventStreamClient.subscribeChatEvents() let exp1 = XCTestExpectation(description: "Subscriber 1 receives message") let exp2 = XCTestExpectation(description: "Subscriber 2 receives message") diff --git a/packages/skill-host-contracts/src/assistant-event.ts b/packages/skill-host-contracts/src/assistant-event.ts index d718fefa041..cfa4fd7a325 100644 --- a/packages/skill-host-contracts/src/assistant-event.ts +++ b/packages/skill-host-contracts/src/assistant-event.ts @@ -66,13 +66,26 @@ export function buildAssistantEvent( * * ``` * event: assistant_event\n - * id: \n + * id: \n * data: \n * \n * ``` + * + * When the envelope's `message` payload carries a numeric `seq`, that value + * is used for the SSE `id:` field so a reconnecting `EventSource` populates + * the `Last-Event-Id` header with the daemon's per-conversation sequence + * number (which the runtime route uses to drive durable-log replay). The + * envelope's UUID `event.id` remains in the JSON payload for legacy + * consumers and as a stable correlator across logs. + * + * The id line is sanitized (newlines stripped, value cast to string) so a + * crafted `seq` or `event.id` cannot inject extra SSE fields. */ export function formatSseFrame(event: AssistantEvent): string { - const sanitizedId = event.id.replace(/[\n\r]/g, ""); + const seq = (event.message as { seq?: unknown } | null | undefined)?.seq; + const rawId = + typeof seq === "number" && Number.isFinite(seq) ? String(seq) : event.id; + const sanitizedId = rawId.replace(/[\n\r]/g, ""); const data = JSON.stringify(event); return `event: assistant_event\nid: ${sanitizedId}\ndata: ${data}\n\n`; }