diff --git a/assistant/src/__tests__/tool-preview-lifecycle.test.ts b/assistant/src/__tests__/tool-preview-lifecycle.test.ts index 514bba9cb73..6c73715403d 100644 --- a/assistant/src/__tests__/tool-preview-lifecycle.test.ts +++ b/assistant/src/__tests__/tool-preview-lifecycle.test.ts @@ -314,16 +314,30 @@ describe("tool preview lifecycle", () => { input: { path: "/test" }, }); - // Verify ordering - const eventTypes = collector.events.map((e) => e.type); + // Verify ordering of the legacy tool events. The streaming + // architecture (PR 1 of the streaming-message-architecture plan) + // interleaves additive `message_open` / `block_open` events here, + // so filter to the legacy tool events the test originally covered. + const STREAMING_LIFECYCLE_TYPES = new Set([ + "message_open", + "block_open", + "block_close", + "message_close", + ]); + const eventTypes = collector.events + .map((e) => e.type) + .filter((t) => !STREAMING_LIFECYCLE_TYPES.has(t)); expect(eventTypes).toEqual([ "tool_use_preview_start", "tool_input_delta", "tool_use_start", ]); - // Verify all events carry the same toolUseId + // Verify all tool-scoped events carry the same toolUseId. The new + // streaming lifecycle events are not tool-scoped (they identify + // messages and blocks instead) so they are excluded from the check. for (const event of collector.events) { + if (STREAMING_LIFECYCLE_TYPES.has(event.type)) continue; expect((event as any).toolUseId).toBe(toolUseId); } }); @@ -397,7 +411,18 @@ describe("tool preview lifecycle", () => { isError: false, }); - const eventTypes = collector.events.map((e) => e.type); + // Filter out the additive streaming lifecycle events emitted by the + // streaming-message-architecture plan (PR 1) so the legacy ordering + // assertion still holds. + const STREAMING_LIFECYCLE_TYPES = new Set([ + "message_open", + "block_open", + "block_close", + "message_close", + ]); + const eventTypes = collector.events + .map((e) => e.type) + .filter((t) => !STREAMING_LIFECYCLE_TYPES.has(t)); expect(eventTypes).toEqual([ "tool_use_preview_start", "tool_input_delta", @@ -405,8 +430,11 @@ describe("tool preview lifecycle", () => { "tool_result", ]); - // Verify toolUseId consistency across all events + // Verify toolUseId consistency across tool-scoped events. The new + // streaming lifecycle events identify messages / blocks instead and + // are excluded. for (const event of collector.events) { + if (STREAMING_LIFECYCLE_TYPES.has(event.type)) continue; expect((event as any).toolUseId).toBe(toolUseId); } diff --git a/assistant/src/api/events/assistant-text-delta.ts b/assistant/src/api/events/assistant-text-delta.ts index d4da810badd..c54b8f232ed 100644 --- a/assistant/src/api/events/assistant-text-delta.ts +++ b/assistant/src/api/events/assistant-text-delta.ts @@ -23,6 +23,14 @@ export const AssistantTextDeltaEventSchema = z type: z.literal("assistant_text_delta"), text: z.string(), messageId: z.string().optional(), + /** 0-based content-block index within the parent `messageId`. Optional + * for backwards compatibility with synthetic deltas that don't bind + * to a block. */ + blockIndex: z.number().optional(), + /** Monotonically increasing per-conversation sequence number for + * idempotent client replay. Optional during the streaming-architecture + * rollout — daemons that pre-date PR 1 of the plan omit it. */ + seq: z.number().optional(), conversationId: z.string().optional(), }) .strict(); diff --git a/assistant/src/api/events/block-close.ts b/assistant/src/api/events/block-close.ts new file mode 100644 index 00000000000..26361d326b1 --- /dev/null +++ b/assistant/src/api/events/block-close.ts @@ -0,0 +1,27 @@ +/** + * `block_close` SSE event. + * + * Emitted when a content block within an assistant message ends — the + * peer of `block_open`. Text blocks close when the next non-text content + * starts (or when the turn ends); tool_use blocks close when their + * matching `tool_result` arrives. Clients should treat `(messageId, + * blockIndex)` as the block identity for idempotent application. + * + * Canonical wire-contract source. Daemon code imports the type directly + * from this file; external consumers import via `@vellumai/assistant-api`. + */ + +import { z } from "zod"; + +export const BlockCloseEventSchema = z + .object({ + type: z.literal("block_close"), + messageId: z.string(), + blockIndex: z.number(), + /** Monotonically increasing per-conversation sequence number. */ + seq: z.number(), + conversationId: z.string().optional(), + }) + .strict(); + +export type BlockCloseEvent = z.infer; diff --git a/assistant/src/api/events/block-open.ts b/assistant/src/api/events/block-open.ts new file mode 100644 index 00000000000..5a6feac435d --- /dev/null +++ b/assistant/src/api/events/block-open.ts @@ -0,0 +1,41 @@ +/** + * `block_open` SSE event. + * + * Emitted when a new content block within an assistant message starts — + * paired with `block_close` at the block's end. Carries the message and + * block coordinates that every block-scoped event (`assistant_text_delta`, + * `tool_use_start`, `tool_input_delta`, `tool_result`) stamps in this turn. + * + * Block kinds today: + * - `text` — a streamed text block opened on the first text delta + * emitted after the previous block closed. + * - `tool_use` — a tool invocation; opened immediately before the + * matching `tool_use_start` event and closed when the + * corresponding `tool_result` arrives. + * + * `blockIndex` is 0-based and monotonically increases within a single + * message; it never repeats across blocks in the same `messageId`. + * + * Canonical wire-contract source. Daemon code imports the type directly + * from this file; external consumers import via `@vellumai/assistant-api`. + */ + +import { z } from "zod"; + +export const BlockOpenEventSchema = z + .object({ + type: z.literal("block_open"), + messageId: z.string(), + blockIndex: z.number(), + blockType: z.enum(["text", "tool_use"]), + /** Tool name when `blockType` is `tool_use`; omitted otherwise. */ + toolName: z.string().optional(), + /** Tool-use id when `blockType` is `tool_use`; omitted otherwise. */ + toolUseId: z.string().optional(), + /** Monotonically increasing per-conversation sequence number. */ + seq: z.number(), + conversationId: z.string().optional(), + }) + .strict(); + +export type BlockOpenEvent = z.infer; diff --git a/assistant/src/api/events/message-close.ts b/assistant/src/api/events/message-close.ts new file mode 100644 index 00000000000..de9b5d82025 --- /dev/null +++ b/assistant/src/api/events/message-close.ts @@ -0,0 +1,25 @@ +/** + * `message_close` SSE event. + * + * Emitted at the end of an assistant turn — the peer of `message_open`, + * carrying the same `messageId`. Marks the turn done in the new + * streaming architecture; the legacy `message_complete` event continues + * to fire alongside it during the rollout for backward compatibility. + * + * Canonical wire-contract source. Daemon code imports the type directly + * from this file; external consumers import via `@vellumai/assistant-api`. + */ + +import { z } from "zod"; + +export const MessageCloseEventSchema = z + .object({ + type: z.literal("message_close"), + messageId: z.string(), + /** Monotonically increasing per-conversation sequence number. */ + seq: z.number(), + conversationId: z.string().optional(), + }) + .strict(); + +export type MessageCloseEvent = z.infer; diff --git a/assistant/src/api/events/message-open.ts b/assistant/src/api/events/message-open.ts new file mode 100644 index 00000000000..62a9945bc0d --- /dev/null +++ b/assistant/src/api/events/message-open.ts @@ -0,0 +1,32 @@ +/** + * `message_open` SSE event. + * + * Emitted by the daemon on the first content emission of an assistant + * turn — before the first `assistant_text_delta` or `tool_use_start` — + * to declare a stable `messageId` (UUIDv7) for the message that the rest + * of the turn's events will stamp on their `messageId` field. Paired with + * `message_close` at end-of-turn. Clients should anchor a bubble at + * `message_open` instead of inferring identity from the first delta. + * + * Additive alongside the legacy `assistant_text_delta` + `message_complete` + * pair during the streaming-architecture rollout; new clients prefer the + * `message_open` / `block_open` / `block_close` / `message_close` shape. + * + * Canonical wire-contract source. Daemon code imports the type directly + * from this file; external consumers import via `@vellumai/assistant-api`. + */ + +import { z } from "zod"; + +export const MessageOpenEventSchema = z + .object({ + type: z.literal("message_open"), + messageId: z.string(), + role: z.enum(["assistant"]), + /** Monotonically increasing per-conversation sequence number. */ + seq: z.number(), + conversationId: z.string().optional(), + }) + .strict(); + +export type MessageOpenEvent = z.infer; diff --git a/assistant/src/api/events/tool-use-start.ts b/assistant/src/api/events/tool-use-start.ts index b76f7d47409..49c6f6c67eb 100644 --- a/assistant/src/api/events/tool-use-start.ts +++ b/assistant/src/api/events/tool-use-start.ts @@ -25,6 +25,11 @@ export const ToolUseStartEventSchema = z input: z.record(z.string(), z.unknown()), toolUseId: z.string().optional(), messageId: z.string().optional(), + /** 0-based content-block index within the parent `messageId`. */ + blockIndex: z.number().optional(), + /** Monotonically increasing per-conversation sequence number for + * idempotent client replay. */ + seq: z.number().optional(), conversationId: z.string().optional(), }) .strict(); diff --git a/assistant/src/api/index.ts b/assistant/src/api/index.ts index 619e59ada79..50f16ac5fba 100644 --- a/assistant/src/api/index.ts +++ b/assistant/src/api/index.ts @@ -2,13 +2,17 @@ import { z } from "zod"; import { AssistantTextDeltaEventSchema } from "./events/assistant-text-delta.js"; import { AssistantTurnStartEventSchema } from "./events/assistant-turn-start.js"; +import { BlockCloseEventSchema } from "./events/block-close.js"; +import { BlockOpenEventSchema } from "./events/block-open.js"; import { DocumentCommentCreatedEventSchema } from "./events/document-comment-created.js"; import { DocumentCommentDeletedEventSchema } from "./events/document-comment-deleted.js"; import { DocumentCommentReopenedEventSchema } from "./events/document-comment-reopened.js"; import { DocumentCommentResolvedEventSchema } from "./events/document-comment-resolved.js"; import { GenerationCancelledEventSchema } from "./events/generation-cancelled.js"; import { GenerationHandoffEventSchema } from "./events/generation-handoff.js"; +import { MessageCloseEventSchema } from "./events/message-close.js"; import { MessageCompleteEventSchema } from "./events/message-complete.js"; +import { MessageOpenEventSchema } from "./events/message-open.js"; import { OpenUrlEventSchema } from "./events/open-url.js"; import { RelationshipStateUpdatedEventSchema } from "./events/relationship-state-updated.js"; import { ToolUseStartEventSchema } from "./events/tool-use-start.js"; @@ -26,6 +30,14 @@ export { type AssistantTurnStartEvent, AssistantTurnStartEventSchema, } from "./events/assistant-turn-start.js"; +export { + type BlockCloseEvent, + BlockCloseEventSchema, +} from "./events/block-close.js"; +export { + type BlockOpenEvent, + BlockOpenEventSchema, +} from "./events/block-open.js"; export { type DocumentCommentCreatedEvent, DocumentCommentCreatedEventSchema, @@ -50,10 +62,18 @@ export { type GenerationHandoffEvent, GenerationHandoffEventSchema, } from "./events/generation-handoff.js"; +export { + type MessageCloseEvent, + MessageCloseEventSchema, +} from "./events/message-close.js"; export { type MessageCompleteEvent, MessageCompleteEventSchema, } from "./events/message-complete.js"; +export { + type MessageOpenEvent, + MessageOpenEventSchema, +} from "./events/message-open.js"; export { type OpenUrlEvent, OpenUrlEventSchema } from "./events/open-url.js"; export { type RelationshipStateUpdatedEvent, @@ -108,13 +128,17 @@ export { export const AssistantEventSchema = z.discriminatedUnion("type", [ AssistantTextDeltaEventSchema, AssistantTurnStartEventSchema, + BlockCloseEventSchema, + BlockOpenEventSchema, DocumentCommentCreatedEventSchema, DocumentCommentDeletedEventSchema, DocumentCommentReopenedEventSchema, DocumentCommentResolvedEventSchema, GenerationCancelledEventSchema, GenerationHandoffEventSchema, + MessageCloseEventSchema, MessageCompleteEventSchema, + MessageOpenEventSchema, OpenUrlEventSchema, RelationshipStateUpdatedEventSchema, ToolUseStartEventSchema, diff --git a/assistant/src/daemon/conversation-agent-loop-handlers.ts b/assistant/src/daemon/conversation-agent-loop-handlers.ts index 9fcaf217d9c..96304be68e1 100644 --- a/assistant/src/daemon/conversation-agent-loop-handlers.ts +++ b/assistant/src/daemon/conversation-agent-loop-handlers.ts @@ -7,7 +7,7 @@ */ import type pino from "pino"; -import { v4 as uuid } from "uuid"; +import { v4 as uuid, v7 as uuidv7 } from "uuid"; import type { AgentEvent } from "../agent/loop.js"; import type { @@ -71,6 +71,7 @@ import { } from "./conversation-error.js"; import { isProviderOrderingError } from "./conversation-slash.js"; import { resolveTurnTimezoneContext } from "./date-context.js"; +import { nextSeq } from "./streaming-events.js"; import type { CardSurfaceData, ServerMessage, @@ -218,6 +219,27 @@ export interface EventHandlerState { readonly serverToolStartedAt: Map; /** Original input from server_tool_start, keyed by tool_use_id, so the complete handler can read the query. */ readonly serverToolInputs: Map>; + // ── Streaming architecture (PR 1) — pre-assigned message_id, block_index, seq + /** + * UUIDv7 pre-assigned on the first content emission of the turn (text + * delta or tool_use). The same id is reused as the row primary key when + * `handleMessageComplete` persists the assistant message, so every + * streaming event for the turn — and the final persisted row — share + * one stable identifier. + */ + currentMessageId: string | undefined; + /** + * 0-based content-block counter for the turn. Bumped each time a new + * block opens (text → tool_use, tool_use → text, tool_use → tool_use). + * `undefined` until the first block opens. + */ + currentBlockIndex: number | undefined; + /** Kind of the currently-open block, or `null` between blocks. */ + currentBlockType: "text" | "tool_use" | null; + /** Maps `toolUseId → blockIndex` so `tool_input_delta` and `tool_result` + * events can be stamped with the same block coordinate as the matching + * `tool_use_start` / `block_open`. */ + readonly toolUseIdToBlockIndex: Map; } /** Immutable context shared across event handlers within a single agent loop run. */ @@ -277,6 +299,10 @@ export function createEventHandlerState(): EventHandlerState { turnStartedAt: Date.now(), serverToolStartedAt: new Map(), serverToolInputs: new Map(), + currentMessageId: undefined, + currentBlockIndex: undefined, + currentBlockType: null, + toolUseIdToBlockIndex: new Map(), }; } @@ -403,6 +429,119 @@ function resolveAssistantReplyTimestampTimezone( }).effectiveTimezone; } +// ── Streaming architecture helpers (PR 1) ──────────────────────────── +// +// These keep the new addressable-event protocol additive alongside the +// legacy `assistant_text_delta` + `message_complete` shape. The agent +// loop pre-assigns a UUIDv7 the first time the turn produces content, +// then re-uses it when the row finally persists in +// `handleMessageComplete`. Every streaming event for the turn carries +// `(messageId, blockIndex, seq)` so reconnecting clients can dedupe +// and order events deterministically. + +/** + * Ensure the turn's assistant message id is allocated and `message_open` + * has been emitted. Called from the first text-delta and tool-use entry + * points. Idempotent — subsequent calls in the same turn are no-ops. + */ +function ensureMessageOpen( + state: EventHandlerState, + deps: EventHandlerDeps, +): string { + if (state.currentMessageId) return state.currentMessageId; + const messageId = uuidv7(); + state.currentMessageId = messageId; + deps.onEvent({ + type: "message_open", + conversationId: deps.ctx.conversationId, + messageId, + role: "assistant", + seq: nextSeq(deps.ctx.conversationId), + }); + return messageId; +} + +/** + * Close the currently-open block, if any. Emits a `block_close` event + * stamped with the open block's index. The caller is expected to + * advance to a new block (via {@link openBlock}) when applicable. + */ +function closeCurrentBlock( + state: EventHandlerState, + deps: EventHandlerDeps, +): void { + if (state.currentBlockType === null || state.currentBlockIndex === undefined) + return; + const messageId = state.currentMessageId; + if (!messageId) return; + deps.onEvent({ + type: "block_close", + conversationId: deps.ctx.conversationId, + messageId, + blockIndex: state.currentBlockIndex, + seq: nextSeq(deps.ctx.conversationId), + }); + state.currentBlockType = null; +} + +/** + * Open a new content block (`text` or `tool_use`). Allocates the next + * `blockIndex`, updates the state, and emits `block_open`. Every caller + * runs {@link ensureMessageOpen} first so `state.currentMessageId` is + * always set. + */ +function openBlock( + state: EventHandlerState, + deps: EventHandlerDeps, + blockType: "text" | "tool_use", + toolInfo?: { toolName: string; toolUseId: string }, +): number { + const messageId = state.currentMessageId!; + const nextIndex = + state.currentBlockIndex === undefined ? 0 : state.currentBlockIndex + 1; + state.currentBlockIndex = nextIndex; + state.currentBlockType = blockType; + if (blockType === "tool_use" && toolInfo) { + state.toolUseIdToBlockIndex.set(toolInfo.toolUseId, nextIndex); + } + deps.onEvent({ + type: "block_open", + conversationId: deps.ctx.conversationId, + messageId, + blockIndex: nextIndex, + blockType, + ...(toolInfo + ? { toolName: toolInfo.toolName, toolUseId: toolInfo.toolUseId } + : {}), + seq: nextSeq(deps.ctx.conversationId), + }); + return nextIndex; +} + +/** + * Ensure the currently-open block matches the requested kind, opening a + * new block (and closing the prior one) when the kind changes or when + * no block is open yet. Returns the resolved `blockIndex`. + * + * `tool_use` blocks are always opened anew so each tool invocation gets + * its own block index, even when consecutive tool calls fire without an + * intervening text block. + */ +function ensureBlockForKind( + state: EventHandlerState, + deps: EventHandlerDeps, + kind: "text" | "tool_use", + toolInfo?: { toolName: string; toolUseId: string }, +): number { + if (kind === "text" && state.currentBlockType === "text") { + return state.currentBlockIndex!; + } + if (state.currentBlockType !== null) { + closeCurrentBlock(state, deps); + } + return openBlock(state, deps, kind, toolInfo); +} + // ── Individual Handlers ────────────────────────────────────────────── function handleTextDelta( @@ -427,10 +566,15 @@ function handleTextDelta( "Thinking", ); } + const messageId = ensureMessageOpen(state, deps); + const blockIndex = ensureBlockForKind(state, deps, "text"); deps.onEvent({ type: "assistant_text_delta", text: drained.emitText, conversationId: deps.ctx.conversationId, + messageId, + blockIndex, + seq: nextSeq(deps.ctx.conversationId), }); if (deps.shouldGenerateTitle) state.firstAssistantText += drained.emitText; } @@ -492,12 +636,20 @@ export function handleToolUse( deps.reqId, statusText, ); + const messageId = ensureMessageOpen(state, deps); + const blockIndex = ensureBlockForKind(state, deps, "tool_use", { + toolName: event.name, + toolUseId: event.id, + }); deps.onEvent({ type: "tool_use_start", toolName: event.name, input: event.input, conversationId: deps.ctx.conversationId, toolUseId: event.id, + messageId, + blockIndex, + seq: nextSeq(deps.ctx.conversationId), }); } @@ -598,7 +750,7 @@ function handleToolOutputChunk( } export function handleInputJsonDelta( - _state: EventHandlerState, + state: EventHandlerState, deps: EventHandlerDeps, event: Extract, ): void { @@ -606,12 +758,16 @@ export function handleInputJsonDelta( // stream for app_create code previews. Non-app tools would send large // cumulative JSON on every delta with no benefit. if (!APP_TOOL_NAMES.has(event.toolName)) return; + const blockIndex = state.toolUseIdToBlockIndex.get(event.toolUseId); deps.onEvent({ type: "tool_input_delta", toolName: event.toolName, content: event.accumulatedJson, conversationId: deps.ctx.conversationId, toolUseId: event.toolUseId, + ...(state.currentMessageId ? { messageId: state.currentMessageId } : {}), + ...(blockIndex !== undefined ? { blockIndex } : {}), + seq: nextSeq(deps.ctx.conversationId), }); } @@ -730,6 +886,7 @@ export function handleToolResult( } // Send to client last so state is consistent even if onEvent throws. + const toolBlockIndex = state.toolUseIdToBlockIndex.get(event.toolUseId); deps.onEvent({ type: "tool_result", toolName: "", @@ -752,7 +909,22 @@ export function handleToolResult( approvalReason: event.approvalReason, riskThreshold: event.riskThreshold, activityMetadata: event.activityMetadata, + ...(state.currentMessageId ? { messageId: state.currentMessageId } : {}), + ...(toolBlockIndex !== undefined ? { blockIndex: toolBlockIndex } : {}), + seq: nextSeq(deps.ctx.conversationId), }); + + // Close the tool_use block now that its result has been delivered. We + // can only safely emit `block_close` when the current block is still + // the matching tool_use — if the model has already started emitting + // text (which would have opened a new block in `handleTextDelta`) the + // earlier block close fired there. + if ( + state.currentBlockType === "tool_use" && + state.currentBlockIndex === toolBlockIndex + ) { + closeCurrentBlock(state, deps); + } } /** @@ -990,10 +1162,15 @@ export async function handleMessageComplete( // Flush any remaining directive display buffer if (state.pendingDirectiveDisplayBuffer.length > 0) { + const flushMessageId = ensureMessageOpen(state, deps); + const flushBlockIndex = ensureBlockForKind(state, deps, "text"); deps.onEvent({ type: "assistant_text_delta", text: state.pendingDirectiveDisplayBuffer, conversationId: deps.ctx.conversationId, + messageId: flushMessageId, + blockIndex: flushBlockIndex, + seq: nextSeq(deps.ctx.conversationId), }); if (deps.shouldGenerateTitle) state.firstAssistantText += state.pendingDirectiveDisplayBuffer; @@ -1161,6 +1338,14 @@ export async function handleMessageComplete( // pipeline. No `syncToDisk` here — the orchestrator separately invokes // `syncMessageToDisk` on `state.lastAssistantMessageId` after the loop // completes (see `conversation-agent-loop.ts::syncLastAssistantMessageToDisk`). + // + // When the agent loop opened a streaming message earlier this turn + // (`state.currentMessageId` was assigned in `ensureMessageOpen`), reuse + // that pre-allocated id as the row primary key so every event the daemon + // already streamed — text deltas, tool_use/tool_result, the new + // message_open / block_open / block_close events — references the same + // id that downstream consumers (message_complete, history reload, + // memory backfills) see. const assistantPersistResult = (await runPipeline( "persistence", getMiddlewaresFor("persistence"), @@ -1171,6 +1356,9 @@ export async function handleMessageComplete( role: "assistant", content: JSON.stringify(contentForPersistence), metadata: assistantChannelMetadata, + ...(state.currentMessageId + ? { addOptions: { messageId: state.currentMessageId } } + : {}), }, buildHandlerTurnContext(deps), DEFAULT_TIMEOUTS.persistence, @@ -1178,6 +1366,37 @@ export async function handleMessageComplete( const assistantMsg = assistantPersistResult.message; state.lastAssistantMessageId = assistantMsg.id; + // Streaming architecture: close any block still open, then emit the + // turn's `message_close`. Synthesizes a `message_open` first when the + // turn produced no content events (canned messages, aux notifier + // injections, slash commands), so clients that only consume the new + // protocol still see a complete `message_open` → `message_close` pair + // for every persisted assistant row. + if (!state.currentMessageId) { + state.currentMessageId = assistantMsg.id; + deps.onEvent({ + type: "message_open", + conversationId: deps.ctx.conversationId, + messageId: assistantMsg.id, + role: "assistant", + seq: nextSeq(deps.ctx.conversationId), + }); + } + if (state.currentBlockType !== null) { + closeCurrentBlock(state, deps); + } + deps.onEvent({ + type: "message_close", + conversationId: deps.ctx.conversationId, + messageId: state.currentMessageId, + seq: nextSeq(deps.ctx.conversationId), + }); + // Reset per-message streaming state so the next turn starts fresh. + state.currentMessageId = undefined; + state.currentBlockIndex = undefined; + state.currentBlockType = null; + state.toolUseIdToBlockIndex.clear(); + // Backfill message_id on all LLM request logs from this turn. // The agent loop is single-threaded per conversation, so all rows with // message_id IS NULL belong to the current turn. @@ -1436,12 +1655,20 @@ export async function dispatchAgentEvent( ); state.serverToolStartedAt.set(event.toolUseId, Date.now()); state.serverToolInputs.set(event.toolUseId, event.input); + const messageId = ensureMessageOpen(state, deps); + const blockIndex = ensureBlockForKind(state, deps, "tool_use", { + toolName: event.name, + toolUseId: event.toolUseId, + }); deps.onEvent({ type: "tool_use_start", toolName: event.name, input: event.input, conversationId: deps.ctx.conversationId, toolUseId: event.toolUseId, + messageId, + blockIndex, + seq: nextSeq(deps.ctx.conversationId), }); break; } @@ -1514,6 +1741,9 @@ export async function dispatchAgentEvent( .map((r) => `${r.title}\n${r.url}`) .join("\n\n"); + const serverToolBlockIndex = state.toolUseIdToBlockIndex.get( + event.toolUseId, + ); deps.onEvent({ type: "tool_result", toolName: "web_search", @@ -1522,7 +1752,20 @@ export async function dispatchAgentEvent( conversationId: deps.ctx.conversationId, toolUseId: event.toolUseId, ...(metadata ? { activityMetadata: { webSearch: metadata } } : {}), + ...(state.currentMessageId + ? { messageId: state.currentMessageId } + : {}), + ...(serverToolBlockIndex !== undefined + ? { blockIndex: serverToolBlockIndex } + : {}), + seq: nextSeq(deps.ctx.conversationId), }); + if ( + state.currentBlockType === "tool_use" && + state.currentBlockIndex === serverToolBlockIndex + ) { + closeCurrentBlock(state, deps); + } break; } case "error": diff --git a/assistant/src/daemon/conversation-store.ts b/assistant/src/daemon/conversation-store.ts index 54011117e88..2243aaa3607 100644 --- a/assistant/src/daemon/conversation-store.ts +++ b/assistant/src/daemon/conversation-store.ts @@ -28,6 +28,7 @@ import { getSandboxWorkingDir } from "../util/platform.js"; import { Conversation } from "./conversation.js"; import type { ConversationEvictor } from "./conversation-evictor.js"; import type { ConversationCreateOptions } from "./handlers/shared.js"; +import { resetSeq } from "./streaming-events.js"; import { buildTransportHints } from "./transport-hints.js"; // ── Private store ────────────────────────────────────────────────── @@ -336,6 +337,7 @@ export function destroyActiveConversation(conversationId: string): void { conversation.dispose(); deleteConversation(conversationId); deleteConversationOptions(conversationId); + resetSeq(conversationId); } /** @@ -348,6 +350,7 @@ export function clearAllActiveConversations(): number { for (const id of conversationIds()) { removeFromEvictor(id); subagentManager.abortAllForParent(id); + resetSeq(id); } for (const conversation of allConversations()) { conversation.dispose(); diff --git a/assistant/src/daemon/message-types/messages.ts b/assistant/src/daemon/message-types/messages.ts index 780dbb71ef3..e3b54b13b7d 100644 --- a/assistant/src/daemon/message-types/messages.ts +++ b/assistant/src/daemon/message-types/messages.ts @@ -2,7 +2,11 @@ import type { AssistantTextDeltaEvent } from "../../api/events/assistant-text-delta.js"; import type { AssistantTurnStartEvent } from "../../api/events/assistant-turn-start.js"; +import type { BlockCloseEvent } from "../../api/events/block-close.js"; +import type { BlockOpenEvent } from "../../api/events/block-open.js"; +import type { MessageCloseEvent } from "../../api/events/message-close.js"; import type { MessageCompleteEvent } from "../../api/events/message-complete.js"; +import type { MessageOpenEvent } from "../../api/events/message-open.js"; import type { ToolUseStartEvent } from "../../api/events/tool-use-start.js"; import type { ChannelId, InterfaceId } from "../../channels/types.js"; import type { CommandIntent, UserMessageAttachment } from "./shared.js"; @@ -120,6 +124,10 @@ export interface ToolInputDelta { /** Database ID of the assistant message that owns this tool_use block. * Same semantics as `AssistantTextDeltaEvent.messageId`. */ messageId?: string; + /** 0-based content-block index within the parent `messageId`. */ + blockIndex?: number; + /** Monotonically increasing per-conversation sequence number. */ + seq?: number; } export interface ToolResult { @@ -183,6 +191,10 @@ export interface ToolResult { /** Structured activity metadata for rich client rendering. Optional; old * clients that key off `result` continue to work unchanged. */ activityMetadata?: ToolActivityMetadata; + /** 0-based content-block index within the parent `messageId`. */ + blockIndex?: number; + /** Monotonically increasing per-conversation sequence number. */ + seq?: number; } export interface ConfirmationRequest { @@ -532,6 +544,10 @@ export type _MessagesServerMessages = | ConfirmationRequest | SecretRequest | QuestionRequest + | MessageOpenEvent + | BlockOpenEvent + | BlockCloseEvent + | MessageCloseEvent | MessageCompleteEvent | ErrorMessage | MessageQueued diff --git a/assistant/src/daemon/streaming-events.ts b/assistant/src/daemon/streaming-events.ts new file mode 100644 index 00000000000..67df8a929ed --- /dev/null +++ b/assistant/src/daemon/streaming-events.ts @@ -0,0 +1,39 @@ +/** + * Per-conversation streaming-event sequencing. + * + * The streaming architecture (see `.private/plans/streaming-message-architecture.md`) + * tags every event the daemon emits for a conversation with a monotonically + * increasing `seq`. The sequence persists for the lifetime of the daemon + * process — it is keyed by `conversationId`, initialized to `0` on first + * access, and bumped on every `nextSeq()` call. PR 2 in the plan will + * persist these sequences to durable storage and reseed from the max + * persisted value at daemon startup; for now we only need monotonic + * in-memory uniqueness so reconnecting clients can detect replays. + * + * The counter is intentionally a module-level map keyed by conversationId + * (rather than living on `Conversation` / `AgentLoopConversationContext`) + * so non-agent-loop emit paths — slash commands, surface broadcasts, + * aux notifier injections — can stamp `seq` without having to thread the + * conversation context through every call site. + */ + +const seqCounters = new Map(); + +/** Return the next monotonic `seq` for the given conversation. */ +export function nextSeq(conversationId: string): number { + const current = seqCounters.get(conversationId) ?? 0; + const next = current + 1; + seqCounters.set(conversationId, next); + return next; +} + +/** Read the current seq counter without advancing it (testing only). */ +export function peekSeq(conversationId: string): number { + return seqCounters.get(conversationId) ?? 0; +} + +/** Drop the seq counter for a conversation. Used when a conversation is + * evicted/destroyed so process memory does not grow unbounded. */ +export function resetSeq(conversationId: string): void { + seqCounters.delete(conversationId); +} diff --git a/assistant/src/memory/conversation-crud.ts b/assistant/src/memory/conversation-crud.ts index 594282e2cf4..6b555e71fa3 100644 --- a/assistant/src/memory/conversation-crud.ts +++ b/assistant/src/memory/conversation-crud.ts @@ -306,6 +306,13 @@ interface InsertMessageCoreParams { content: string; metadata?: Record; clientMessageId?: string; + /** + * Optional pre-allocated message id. Callers that need a stable id before + * the row is persisted (the streaming agent loop assigns a UUIDv7 at + * message-open time so every event for the turn can stamp it) pass it + * here; the insert uses the supplied id instead of minting a fresh one. + */ + messageId?: string; } /** @@ -332,9 +339,16 @@ interface InsertMessageCoreParams { async function insertMessageCore( params: InsertMessageCoreParams, ): Promise { - const { conversationId, role, content, metadata, clientMessageId } = params; + const { + conversationId, + role, + content, + metadata, + clientMessageId, + messageId: preAssignedMessageId, + } = params; const db = getDb(); - const messageId = uuid(); + const messageId = preAssignedMessageId ?? uuid(); if (metadata) { const result = messageMetadataSchema.safeParse(metadata); @@ -1191,7 +1205,7 @@ export async function addMessage( role: MessageRole, content: string, metadata?: Record, - opts?: { skipIndexing?: boolean }, + opts?: { skipIndexing?: boolean; messageId?: string }, clientMessageId?: string, ) { const inserted = await insertMessageCore({ @@ -1200,6 +1214,7 @@ export async function addMessage( content, metadata, clientMessageId, + ...(opts?.messageId ? { messageId: opts.messageId } : {}), }); if (inserted.deduplicated) { diff --git a/assistant/src/plugins/types.ts b/assistant/src/plugins/types.ts index 362806aadb8..2835d4022da 100644 --- a/assistant/src/plugins/types.ts +++ b/assistant/src/plugins/types.ts @@ -470,7 +470,16 @@ export type PersistAddArgs = { readonly role: MessageRole; readonly content: string; readonly metadata?: Record; - readonly addOptions?: { readonly skipIndexing?: boolean }; + readonly addOptions?: { + readonly skipIndexing?: boolean; + /** + * Optional pre-allocated message id. The streaming agent loop pre-assigns + * a UUIDv7 at message-open time so every event in the turn can stamp it; + * when the row finally persists it must reuse the same id. Plugin + * middleware should pass this through unchanged. + */ + readonly messageId?: string; + }; /** * When `true`, the default plugin additionally invokes * {@link syncMessageToDisk} with the returned message's id. Requires