From 5749e25808490b02ea0953db1f24de40746c509b Mon Sep 17 00:00:00 2001 From: "vellum-apollo-bot[bot]" <242025090+vellum-apollo-bot[bot]@users.noreply.github.com> Date: Wed, 27 May 2026 21:04:57 +0000 Subject: [PATCH 1/3] B3: pre-allocate assistant row at llm_call_started boundary MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The wire-protocol additions in B2 (assistant_turn_start + messageId on streaming events) were emitted but the row they reference didn't exist yet — handleMessageComplete still INSERTed at end-of-turn, so the client could not anchor deltas to a stable DB id during the turn. B3 reserves the row up-front and converts the terminal write to an in-place update. Production changes: - assistant/src/agent/loop.ts: add 'llm_call_started' AgentEvent and emit it inside AgentLoop.run() immediately before provider.sendMessage(), awaited so the reserve completes before any streaming delta. The event carries the optional callSite tag so downstream handlers can decide whether to reserve for this call. - assistant/src/daemon/conversation-agent-loop-handlers.ts: - extract buildAssistantChannelMetadata() so the channel/provenance/slackMeta envelope (previously built inside handleMessageComplete) can be stamped onto the row at reserve time. All inputs are stable across the LLM call; Slack channelTs is intentionally absent and back-filled by deliverReplyViaCallback as today. - new handleLlmCallStarted(): runs the persistence pipeline with op:'reserve', stashes the resulting id on state.lastAssistantMessageId, and emits 'assistant_turn_start' on the wire. - dispatchAgentEvent routes 'llm_call_started' to the new handler. - handleMessageComplete flips its assistant persistence call from op:'add' to op:'updateContent' against state.lastAssistantMessageId. Throws if no row was reserved — production invariant: every message_complete is preceded by exactly one llm_call_started. - every wire emission inside the call stamps messageId: state.lastAssistantMessageId — assistant_text_delta, assistant_thinking_delta, tool_use_start, tool_use_preview_start, tool_output_chunk variants, input_json_delta, tool_result, the handleMessageComplete buffer flush, and the two dispatchAgentEvent fallthroughs. - assistant/src/daemon/wake-target-adapter.ts: translate the new AgentEvent type as no-op (returns null). Wake-path persistence still goes through persistTailMessage (an addMessage-shaped call) rather than the reserve→updateContent pipeline; full wake-path parity is a B3 follow-up. Multi-LLM-call agent turns (LLM call → tool execution → LLM call) emit one llm_call_started per call, so each call reserves its own row. findDisplayTurnEndIndex already collapses consecutive assistant rows for the merged history view, matching today's per-call DB layout. Test updates: - conversation-agent-loop.test.ts: 60/60. Primed 21 fixtures; mixed approach — surgical edits for fixtures with unique anchors, a one-shot python loop for 8 retry/tool-first fixtures with non-unique anchors. Widened the 'drains queue after completion' fixture's onEvent param from sync to void | Promise so the prime could be awaited before message_complete dispatched. - conversation-agent-loop-overflow.test.ts (6/6), conversation-abort-tool-results.test.ts (2/2), conversation-provider-retry-repair.test.ts (9/9), conversation-slash-unknown.test.ts (2/2), conversation-workspace-injection.test.ts (10/10), conversation-workspace-tool-tracking.test.ts (6/6): added updateMessageContent stub to the conversation-crud mock and (for overflow) primed fixtures via the prime-fixtures heuristic walker. - conversation-queue.test.ts (49/49 + 1 todo) and conversation-slash-queue.test.ts (4/4): widened PendingRun.onEvent and the AgentLoop.run mock signature to void | Promise, made resolveRun async with awaited prime/usage/message_complete, and awaited every resolveRun call site. For five direct (non-helper) run.onEvent sites in attachment / cancel-semantics tests, prepended an awaited llm_call_started prime so message_complete sees a reserved row. - outbound-slack-persistence.test.ts (7/7) and persistence-secret-redaction.test.ts (8/8): full direct-handler recipe. Rewired the reserveMessage mock to push the reserved row (with id, content:"", and the production metadata envelope) into the same capture arrays that addMessage uses, so existing assertion helpers (lastAssistantPersisted etc.) find it. updateMessageContent finds the row by id and updates it in-place. Imported handleLlmCallStarted and inserted an awaited call before each handleMessageComplete invocation. The unrelated test-pollution failures observed when running the full suite (schedule-store, skill_load, migration-*, etc.) reproduce identically on the B2 baseline — 62 pass / 101 fail across the same 11 conversation files — confirming they are pre-existing module-cache / env-var leak issues independent of this change. Every touched file passes 100% standalone. ### AGENTS.md compliance - assistant/AGENTS.md "Test machinery isolation": all test changes remain inside src/__tests__/; no new imports from src/ into helpers or preloads; helpers continue to use bun:test + sibling-test scope. - software-engineering/unit-testing.md rule #3 (mock.module audit): the new exported handleLlmCallStarted is imported only by tests that already mock the handlers module explicitly; no existing mock.module sites needed a stub-list update. --- .../conversation-abort-tool-results.test.ts | 4 + .../conversation-agent-loop-overflow.test.ts | 33 +++ .../__tests__/conversation-agent-loop.test.ts | 96 +++++++- ...conversation-provider-retry-repair.test.ts | 4 + .../src/__tests__/conversation-queue.test.ts | 143 ++++++------ .../conversation-slash-queue.test.ts | 28 ++- .../conversation-slash-unknown.test.ts | 4 + .../conversation-workspace-injection.test.ts | 4 + ...nversation-workspace-tool-tracking.test.ts | 4 + .../outbound-slack-persistence.test.ts | 48 +++- .../persistence-secret-redaction.test.ts | 96 ++++++-- assistant/src/agent/loop.ts | 18 ++ .../conversation-agent-loop-handlers.ts | 209 ++++++++++++------ assistant/src/daemon/wake-target-adapter.ts | 9 + 14 files changed, 539 insertions(+), 161 deletions(-) diff --git a/assistant/src/__tests__/conversation-abort-tool-results.test.ts b/assistant/src/__tests__/conversation-abort-tool-results.test.ts index 642f955bfa4..21199f9b45d 100644 --- a/assistant/src/__tests__/conversation-abort-tool-results.test.ts +++ b/assistant/src/__tests__/conversation-abort-tool-results.test.ts @@ -139,6 +139,7 @@ mock.module("../memory/conversation-crud.js", () => ({ getMessageById: () => null, getLastUserTimestampBefore: () => 0, reserveMessage: mock(async () => ({ id: "msg-reserve" })), + updateMessageContent: mock(() => {}), })); mock.module("../memory/conversation-queries.js", () => ({ @@ -193,6 +194,9 @@ mock.module("../agent/loop.js", () => ({ onEvent: (event: AgentEvent) => void, _signal?: AbortSignal, ): Promise { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); const history = [...messages]; // Simulate provider response with 2 tool_use blocks diff --git a/assistant/src/__tests__/conversation-agent-loop-overflow.test.ts b/assistant/src/__tests__/conversation-agent-loop-overflow.test.ts index cba8d6d93b6..03464a59cf2 100644 --- a/assistant/src/__tests__/conversation-agent-loop-overflow.test.ts +++ b/assistant/src/__tests__/conversation-agent-loop-overflow.test.ts @@ -736,6 +736,9 @@ describe("session-agent-loop overflow recovery (JARVIS-110)", () => { let agentLoopCallCount = 0; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); agentLoopCallCount++; if (agentLoopCallCount === 1) { // Simulate: agent makes progress (tool calls + results added) @@ -915,6 +918,9 @@ describe("session-agent-loop overflow recovery (JARVIS-110)", () => { }; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); callCount++; if (callCount === 1) { // Provider rejects with "prompt is too long: 242201 tokens > 200000" @@ -1037,6 +1043,9 @@ describe("session-agent-loop overflow recovery (JARVIS-110)", () => { }; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); callCount++; if (callCount === 1) { // Provider rejects: actual tokens 242201, way above estimate of 185k @@ -1162,6 +1171,9 @@ describe("session-agent-loop overflow recovery (JARVIS-110)", () => { }; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); callCount++; onEvent({ type: "message_complete", @@ -1250,6 +1262,9 @@ describe("session-agent-loop overflow recovery (JARVIS-110)", () => { let agentLoopCallCount = 0; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); agentLoopCallCount++; if (agentLoopCallCount === 1) { // Agent makes progress (tool calls succeed, messages grow) @@ -1441,6 +1456,9 @@ describe("session-agent-loop overflow recovery (JARVIS-110)", () => { _requestId, onCheckpoint, ) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); agentLoopCallCount++; if (agentLoopCallCount === 1) { @@ -1626,6 +1644,9 @@ describe("session-agent-loop overflow recovery (JARVIS-110)", () => { _requestId, onCheckpoint, ) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); agentLoopCallCount++; if (agentLoopCallCount === 1) { @@ -1801,6 +1822,9 @@ describe("session-agent-loop overflow recovery (JARVIS-110)", () => { _requestId, onCheckpoint, ) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); agentLoopCallCount++; // Every call: simulate tool progress then yield at checkpoint @@ -1962,6 +1986,9 @@ describe("session-agent-loop overflow recovery (JARVIS-110)", () => { _requestId, onCheckpoint, ) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); agentLoopCallCount++; const withProgress: Message[] = [ @@ -2194,6 +2221,9 @@ describe("session-agent-loop overflow recovery (JARVIS-110)", () => { }; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -2297,6 +2327,9 @@ describe("session-agent-loop overflow recovery (JARVIS-110)", () => { _requestId, onCheckpoint, ) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); agentLoopCallCount++; const withProgress: Message[] = [ diff --git a/assistant/src/__tests__/conversation-agent-loop.test.ts b/assistant/src/__tests__/conversation-agent-loop.test.ts index 6afebd292f8..0ae4605bfdf 100644 --- a/assistant/src/__tests__/conversation-agent-loop.test.ts +++ b/assistant/src/__tests__/conversation-agent-loop.test.ts @@ -198,6 +198,7 @@ mock.module("../memory/conversation-crud.js", () => ({ getMessageById: () => mockMessageById, getLastUserTimestampBefore: () => 0, reserveMessage: mock(async () => ({ id: "msg-reserve" })), + updateMessageContent: mock(() => {}), })); afterAll(() => { @@ -784,6 +785,9 @@ describe("session-agent-loop", () => { _requestId, onCheckpoint, ) => { + // Prime the assistant row anchor for LLM call 1 — production code + // emits this from `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); await onEvent({ type: "message_complete", message: { @@ -809,6 +813,9 @@ describe("session-agent-loop", () => { hasToolUse: true, history: messages, }); + // Prime the anchor again for LLM call 2 — multi-call agent turns + // reserve a fresh assistant row per LLM call. + await onEvent({ type: "llm_call_started" }); await onEvent({ type: "message_complete", message: { @@ -1065,6 +1072,9 @@ describe("session-agent-loop", () => { const events: ServerMessage[] = []; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); // Simulate tool_use + error during execution onEvent({ type: "tool_use", @@ -1114,6 +1124,9 @@ describe("session-agent-loop", () => { const events: ServerMessage[] = []; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -1174,6 +1187,9 @@ describe("session-agent-loop", () => { }; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -1239,6 +1255,9 @@ describe("session-agent-loop", () => { }; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -1321,6 +1340,9 @@ describe("session-agent-loop", () => { }; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -1389,6 +1411,9 @@ describe("session-agent-loop", () => { }> = []; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "text_delta", text: "Hi." }); onEvent({ type: "message_complete", @@ -1464,6 +1489,9 @@ describe("session-agent-loop", () => { }> = []; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); // No text_delta — pure tool-call response onEvent({ type: "message_complete", @@ -1527,6 +1555,9 @@ describe("session-agent-loop", () => { const events: ServerMessage[] = []; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -1639,6 +1670,9 @@ describe("session-agent-loop", () => { }); const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -1729,6 +1763,11 @@ describe("session-agent-loop", () => { }; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. Retry branches + // need this on every invocation: each agent-loop iteration reserves + // its own row. + await onEvent({ type: "llm_call_started" }); callCount++; if (callCount === 1) { onEvent({ @@ -1856,6 +1895,11 @@ describe("session-agent-loop", () => { }; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. Retry branches + // need this on every invocation: each agent-loop iteration reserves + // its own row. + await onEvent({ type: "llm_call_started" }); callCount++; if (callCount === 1) { onEvent({ @@ -1941,6 +1985,11 @@ describe("session-agent-loop", () => { mockOverflowAction = "auto_compress_latest_turn"; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. Retry branches + // need this on every invocation: each agent-loop iteration reserves + // its own row. + await onEvent({ type: "llm_call_started" }); callCount++; if (callCount <= 2) { onEvent({ @@ -2238,6 +2287,9 @@ describe("session-agent-loop", () => { const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { agentLoopCalls++; + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -2286,6 +2338,11 @@ describe("session-agent-loop", () => { let callCount = 0; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. Retry branches + // need this on every invocation: each agent-loop iteration reserves + // its own row. + await onEvent({ type: "llm_call_started" }); callCount++; if (callCount === 1) { onEvent({ @@ -2370,6 +2427,11 @@ describe("session-agent-loop", () => { _reqId, onCheckpoint, ) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. Retry branches + // need this on every invocation: each agent-loop iteration reserves + // its own row. + await onEvent({ type: "llm_call_started" }); // Simulate tool use followed by checkpoint onEvent({ type: "tool_use", id: "tu-1", name: "file_read", input: {} }); onEvent({ @@ -2443,6 +2505,11 @@ describe("session-agent-loop", () => { _reqId, onCheckpoint, ) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. Retry branches + // need this on every invocation: each agent-loop iteration reserves + // its own row. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "tool_use", id: "tu-1", name: "file_read", input: {} }); onEvent({ type: "tool_result", @@ -2505,6 +2572,9 @@ describe("session-agent-loop", () => { const abortController = new AbortController(); const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -2565,6 +2635,9 @@ describe("session-agent-loop", () => { resolveAssistantAttachmentsMock.mockClear(); const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -2604,6 +2677,9 @@ describe("session-agent-loop", () => { test("increments turnCount after successful run", async () => { const ctx = makeCtx({ agentLoopRun: async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -2637,6 +2713,9 @@ describe("session-agent-loop", () => { test("clears processing state and abort controller", async () => { const ctx = makeCtx({ agentLoopRun: async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -2700,8 +2779,13 @@ describe("session-agent-loop", () => { const ctx = makeCtx({ agentLoopRun: async ( messages: Message[], - onEvent: (event: AgentEvent) => void, + onEvent: (event: AgentEvent) => void | Promise, ) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. Must be + // awaited so the assistant row is reserved before message_complete + // tries to write into it. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -3909,6 +3993,11 @@ describe("session-agent-loop", () => { let callCount = 0; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { callCount++; + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. Retry branches + // need this on every invocation: each agent-loop iteration reserves + // its own row. + await onEvent({ type: "llm_call_started" }); if (callCount === 1) { // Trigger convergence path: error + appended assistant message so // updatedHistory.length > preRunHistoryLength at the strip site. @@ -3989,6 +4078,11 @@ describe("session-agent-loop", () => { let callCount = 0; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { callCount++; + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. Retry branches + // need this on every invocation: each agent-loop iteration reserves + // its own row. + await onEvent({ type: "llm_call_started" }); if (callCount === 1) { onEvent({ type: "error", diff --git a/assistant/src/__tests__/conversation-provider-retry-repair.test.ts b/assistant/src/__tests__/conversation-provider-retry-repair.test.ts index 286d0de9b0c..240a17987f8 100644 --- a/assistant/src/__tests__/conversation-provider-retry-repair.test.ts +++ b/assistant/src/__tests__/conversation-provider-retry-repair.test.ts @@ -186,6 +186,7 @@ mock.module("../memory/conversation-crud.js", () => ({ getMessageById: () => null, getLastUserTimestampBefore: () => 0, reserveMessage: mock(async () => ({ id: "msg-reserve" })), + updateMessageContent: mock(() => {}), })); mock.module("../memory/conversation-queries.js", () => ({ @@ -278,6 +279,9 @@ mock.module("../agent/loop.js", () => ({ onEvent: (event: AgentEvent) => void, _signal?: AbortSignal, ): Promise { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); agentLoopRunCount++; if ( diff --git a/assistant/src/__tests__/conversation-queue.test.ts b/assistant/src/__tests__/conversation-queue.test.ts index f62da7d1082..98cd2ef43c9 100644 --- a/assistant/src/__tests__/conversation-queue.test.ts +++ b/assistant/src/__tests__/conversation-queue.test.ts @@ -194,6 +194,7 @@ mock.module("../memory/conversation-crud.js", () => ({ getMessageById: () => null, getLastUserTimestampBefore: () => 0, reserveMessage: mock(async () => ({ id: "msg-reserve" })), + updateMessageContent: mock(() => {}), })); mock.module("../memory/conversation-queries.js", () => ({ @@ -316,7 +317,7 @@ interface PendingRun { resolve: (history: Message[]) => void; reject: (err: Error) => void; messages: Message[]; - onEvent: (event: AgentEvent) => void; + onEvent: (event: AgentEvent) => void | Promise; onCheckpoint?: ( checkpoint: CheckpointInfo, ) => CheckpointDecision | Promise; @@ -338,7 +339,7 @@ mock.module("../agent/loop.js", () => ({ } async run( messages: Message[], - onEvent: (event: AgentEvent) => void, + onEvent: (event: AgentEvent) => void | Promise, _signal?: AbortSignal, _requestId?: string, onCheckpoint?: ( @@ -472,7 +473,7 @@ async function waitForCondition( * that `runAgentLoop` expects (usage + message_complete) so the conversation * cleanly transitions out of its processing state. */ -function resolveRun(index: number) { +async function resolveRun(index: number) { const run = pendingRuns[index]; if (!run) throw new Error(`No pending run at index ${index}`); // Emit the events runAgentLoop expects @@ -480,14 +481,17 @@ function resolveRun(index: number) { role: "assistant", content: [{ type: "text", text: `reply-${index}` }], }; - run.onEvent({ + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await run.onEvent({ type: "llm_call_started" }); + await run.onEvent({ type: "usage", inputTokens: 10, outputTokens: 5, model: "mock", providerDurationMs: 100, }); - run.onEvent({ type: "message_complete", message: assistantMsg }); + await run.onEvent({ type: "message_complete", message: assistantMsg }); // Return updated history with the assistant message appended run.resolve([...run.messages, assistantMsg]); } @@ -545,7 +549,7 @@ describe("Conversation message queue", () => { expect(conversation.getQueueDepth()).toBe(1); // Complete the first message - resolveRun(0); + await resolveRun(0); await p1; // After the first run resolves, the queue drains and triggers a second run. @@ -558,7 +562,7 @@ describe("Conversation message queue", () => { expect(pendingRuns.length).toBe(2); // Complete the second run - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); }); @@ -585,7 +589,7 @@ describe("Conversation message queue", () => { expect(conversation.getQueueDepth()).toBe(2); // Complete run 0 → drain pulls msg-2 and msg-3 into ONE batched run. - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -624,7 +628,7 @@ describe("Conversation message queue", () => { expect(combinedUserText).toContain("msg-3"); // Resolve the batched run; message_complete must fan out to both clients. - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); expect(events2.some((e) => e.type === "message_complete")).toBe(true); @@ -651,7 +655,7 @@ describe("Conversation message queue", () => { expect(result.queued).toBe(true); // Complete first - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -665,7 +669,7 @@ describe("Conversation message queue", () => { }); // Complete second run so the conversation finishes cleanly - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); }); @@ -771,7 +775,7 @@ describe("Conversation message queue", () => { // Complete first → drain pulls all three same-interface passthroughs // into a single batched run (depth → 0, runs → 2 total). - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -779,7 +783,7 @@ describe("Conversation message queue", () => { expect(pendingRuns.length).toBe(2); // Complete the batched run; conversation finishes cleanly. - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); }); @@ -808,7 +812,7 @@ describe("Conversation message queue", () => { // Complete first message — triggers drain. The empty message should fail // to persist, but the drain should continue to msg-3. - resolveRun(0); + await resolveRun(0); await p1; // msg-3 should have been dequeued and started a new AgentLoop.run @@ -825,7 +829,7 @@ describe("Conversation message queue", () => { expect(events3.some((e) => e.type === "message_dequeued")).toBe(true); // Complete the third message's run - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); // msg-3 should have completed successfully @@ -900,7 +904,7 @@ describe("Batched drain", () => { expect(conversation.getQueueDepth()).toBe(4); // Resolve msg-1 → batched run pulls macos msg-2 + msg-3. - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -928,7 +932,7 @@ describe("Batched drain", () => { ); // Resolve the batched run → drain pulls the cli single-message run. - resolveRun(1); + await resolveRun(1); await waitForPendingRun(3); // cli run contains msg-4 as a single-message run. @@ -943,7 +947,7 @@ describe("Batched drain", () => { ); // Resolve the cli run → drain pulls the final macos single-message run. - resolveRun(2); + await resolveRun(2); await waitForPendingRun(4); const finalHistory = pendingRuns[3].messages; const finalUserText = finalHistory @@ -958,7 +962,7 @@ describe("Batched drain", () => { // Four total runs: msg-1, batched [msg-2, msg-3], msg-4, msg-5. expect(pendingRuns.length).toBe(4); - resolveRun(3); + await resolveRun(3); await new Promise((r) => setTimeout(r, 10)); }); @@ -1000,7 +1004,7 @@ describe("Batched drain", () => { // Resolve msg-1 → drain pulls "hello" as its own run (batch stops at // /compact boundary). - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -1011,7 +1015,7 @@ describe("Batched drain", () => { // Resolve "hello" → drain pops /compact via the builder-rejected path, // runs its short-circuit (no new runAgentLoop), then drains "world". - resolveRun(1); + await resolveRun(1); await waitForPendingRun(3); // /compact should have emitted its own message_complete via the short- @@ -1020,7 +1024,7 @@ describe("Batched drain", () => { expect(eventsWorld.some((e) => e.type === "message_dequeued")).toBe(true); expect(pendingRuns.length).toBe(3); - resolveRun(2); + await resolveRun(2); await new Promise((r) => setTimeout(r, 10)); }); @@ -1074,7 +1078,7 @@ describe("Batched drain", () => { // Resolve msg-1 → drain pulls "plain-a" as its own run (batch stops at // the /status boundary). - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -1087,7 +1091,7 @@ describe("Batched drain", () => { // runs its unknown-slash short-circuit (no new runAgentLoop, emits // assistant_text_delta + message_complete inline), then drains "plain-b" // as its own run. - resolveRun(1); + await resolveRun(1); await waitForPendingRun(3); // /status should have emitted its own assistant_text_delta + message_complete @@ -1101,7 +1105,7 @@ describe("Batched drain", () => { // without a runAgentLoop invocation. expect(pendingRuns.length).toBe(3); - resolveRun(2); + await resolveRun(2); await new Promise((r) => setTimeout(r, 10)); }); @@ -1137,7 +1141,7 @@ describe("Batched drain", () => { conversation.enqueueMessage("with-B", attachB, () => {}, "req-B"); expect(conversation.getQueueDepth()).toBe(2); - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -1171,7 +1175,7 @@ describe("Batched drain", () => { expect(allText).toContain("a.png"); expect(allText).toContain("b.png"); - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); }); @@ -1232,13 +1236,13 @@ describe("Batched drain", () => { } // Complete in-flight → drain pulls both queued passthroughs as ONE batched run. - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); expect(conversation.getQueueDepth()).toBe(0); // Resolve the batched run. - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); // After the full drain, the byte budget must be fully reclaimed — a fresh @@ -1255,10 +1259,10 @@ describe("Batched drain", () => { .queued, ).toBe(true); - resolveRun(2); + await resolveRun(2); await p2; await waitForPendingRun(4); - resolveRun(3); + await resolveRun(3); await new Promise((r) => setTimeout(r, 10)); }); }); @@ -1308,7 +1312,7 @@ describe("Batched drain correctness fixes", () => { // Complete run 0 → drain must NOT batch the surface-action with the // regular passthrough. Expect the surface-action to drain as a single // run first. - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -1323,7 +1327,7 @@ describe("Batched drain correctness fixes", () => { // Complete the surface-action run; drain pulls the regular passthrough // as its own separate run. - resolveRun(1); + await resolveRun(1); await waitForPendingRun(3); expect(pendingRuns.length).toBe(3); expect( @@ -1332,7 +1336,7 @@ describe("Batched drain correctness fixes", () => { // Total runs = 3: msg-1, surface-action, regular — NOT 2 (would mean // they were batched). - resolveRun(2); + await resolveRun(2); await new Promise((r) => setTimeout(r, 10)); }); @@ -1383,7 +1387,7 @@ describe("Batched drain correctness fixes", () => { ).length; // Complete run 0 → drain pulls the sibling batch. - resolveRun(0); + await resolveRun(0); await p1; // Give the drain loop a chance to iterate. Abort happens on msg-3's @@ -1448,7 +1452,7 @@ describe("Batched drain correctness fixes", () => { expect(conversation.getQueueDepth()).toBe(3); // Complete run 0 → batched drain. - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -1465,7 +1469,7 @@ describe("Batched drain correctness fixes", () => { ).toBe("req-tail"); // Cleanup: resolve the batched run. - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 20)); }); @@ -1512,12 +1516,12 @@ describe("Batched drain correctness fixes", () => { "req-fanout-tail", ); - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); // Drive the batched run to emit message_complete via fanOutOnEvent. - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 20)); expect(events3.find((e) => e.type === "error")).toBeDefined(); @@ -1550,7 +1554,7 @@ describe("Batched drain correctness fixes", () => { conversation.enqueueMessage("msg-4", [], () => {}, "req-4"); // Complete run 0 → drain pulls the batched siblings as ONE run. - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -1570,7 +1574,7 @@ describe("Batched drain correctness fixes", () => { requestId: "req-2", // head's requestId, per the fix }); - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); }); @@ -1619,9 +1623,9 @@ describe("Conversation queue policy helpers", () => { expect(conversation.hasQueuedMessages()).toBe(true); // Cleanup: resolve the pending run - resolveRun(0); + await resolveRun(0); await waitForPendingRun(2); - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); }); @@ -1646,7 +1650,7 @@ describe("Conversation queue policy helpers", () => { expect(conversation.canHandoffAtCheckpoint()).toBe(false); // Cleanup - resolveRun(0); + await resolveRun(0); await new Promise((r) => setTimeout(r, 10)); }); @@ -1666,9 +1670,9 @@ describe("Conversation queue policy helpers", () => { expect(conversation.canHandoffAtCheckpoint()).toBe(true); // Cleanup - resolveRun(0); + await resolveRun(0); await waitForPendingRun(2); - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); }); @@ -1733,7 +1737,7 @@ describe("Conversation checkpoint handoff", () => { expect(decision).toBe("yield"); // Complete the run so the conversation finishes cleanly - resolveRun(0); + await resolveRun(0); await p1; // After yield, the first message should emit generation_handoff @@ -1748,7 +1752,7 @@ describe("Conversation checkpoint handoff", () => { // The queued message should now be draining (second run started) await waitForPendingRun(2); - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); }); @@ -1776,7 +1780,7 @@ describe("Conversation checkpoint handoff", () => { expect(decision).toBe("continue"); // Cleanup - resolveRun(0); + await resolveRun(0); await p1; }); @@ -1816,7 +1820,7 @@ describe("Conversation checkpoint handoff", () => { expect(decision).toBe("yield"); // Complete first run - resolveRun(0); + await resolveRun(0); await p1; // The yielded drain pulls ALL THREE queued siblings as ONE batched run — @@ -1830,7 +1834,7 @@ describe("Conversation checkpoint handoff", () => { expect(events4.some((e) => e.type === "message_dequeued")).toBe(true); // Resolve the batched run — message_complete fans out to all three clients. - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); expect(events2.some((e) => e.type === "message_complete")).toBe(true); @@ -1874,7 +1878,7 @@ describe("Conversation checkpoint handoff", () => { expect(decision).toBe("yield"); // Complete the run (AgentLoop resolves after yielding) - resolveRun(0); + await resolveRun(0); await p1; // Verify generation_handoff was emitted (not plain message_complete) @@ -1897,7 +1901,7 @@ describe("Conversation checkpoint handoff", () => { expect(events2.some((e) => e.type === "message_dequeued")).toBe(true); // Complete the second run - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); }); @@ -1967,7 +1971,7 @@ describe("Conversation checkpoint handoff", () => { history: [], }), ).toBe("yield"); - resolveRun(0); + await resolveRun(0); await pA; // B should be draining @@ -1984,7 +1988,7 @@ describe("Conversation checkpoint handoff", () => { history: [], }), ).toBe("yield"); - resolveRun(1); + await resolveRun(1); await waitForPendingRun(3); // Handoff from C -> D @@ -1999,7 +2003,7 @@ describe("Conversation checkpoint handoff", () => { history: [], }), ).toBe("yield"); - resolveRun(2); + await resolveRun(2); await waitForPendingRun(4); // D has no more queued -> checkpoint should return 'continue' @@ -2014,7 +2018,7 @@ describe("Conversation checkpoint handoff", () => { }), ).toBe("continue"); - resolveRun(3); + await resolveRun(3); await new Promise((r) => setTimeout(r, 10)); // Verify FIFO dequeue order @@ -2044,7 +2048,7 @@ describe("Conversation checkpoint handoff", () => { expect(conversation.getQueueDepth()).toBe(2); // Complete message A — triggers drain. B should fail, C should proceed. - resolveRun(0); + await resolveRun(0); await pA; // C should have been dequeued and started a new AgentLoop.run @@ -2061,7 +2065,7 @@ describe("Conversation checkpoint handoff", () => { expect(eventsC.some((e) => e.type === "message_dequeued")).toBe(true); // Complete C's run - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); // C should have completed successfully @@ -2098,7 +2102,7 @@ describe("Conversation checkpoint handoff", () => { expect(pendingRuns[1].onCheckpoint).toBeDefined(); // Complete retry cleanly - resolveRun(1); + await resolveRun(1); await p1; }); }); @@ -2121,7 +2125,7 @@ describe("Conversation usage requestId correlation", () => { await waitForPendingRun(1); // Complete the run — this triggers recordUsage with the request's ID - resolveRun(0); + await resolveRun(0); await p1; // The usage event should carry the request ID, not null @@ -2159,7 +2163,7 @@ describe("Terminal trace events on rejection/failure", () => { conversation.enqueueMessage("msg-3", [], () => {}, "req-3"); // Complete first — triggers drain, empty msg fails persist - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -2174,7 +2178,7 @@ describe("Terminal trace events on rejection/failure", () => { expect(errorTrace).toBeDefined(); // Cleanup - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); }); }); @@ -2216,6 +2220,7 @@ describe("Conversation host attachment directives", () => { }, ], }; + await run.onEvent({ type: "llm_call_started" }); run.onEvent({ type: "usage", inputTokens: 10, @@ -2285,6 +2290,7 @@ describe("Conversation host attachment directives", () => { }, ], }; + await run.onEvent({ type: "llm_call_started" }); run.onEvent({ type: "usage", inputTokens: 10, @@ -2378,6 +2384,7 @@ describe("Conversation attachment event payloads", () => { } as any, ], }); + await run.onEvent({ type: "llm_call_started" }); run.onEvent({ type: "usage", inputTokens: 10, @@ -2453,6 +2460,7 @@ describe("Conversation attachment event payloads", () => { } as any, ], }); + await run.onEvent({ type: "llm_call_started" }); run.onEvent({ type: "usage", inputTokens: 10, @@ -2479,7 +2487,7 @@ describe("Conversation attachment event payloads", () => { expect(attachments[0].data).toBe("iVBORw0K"); await waitForPendingRun(2); - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); }); }); @@ -2511,7 +2519,7 @@ describe("Regression: cancel semantics and error channel split", () => { conversation.abort(); // Resolve the pending run so the abort-check path fires - resolveRun(0); + await resolveRun(0); await p1; // generation_cancelled should be emitted via the per-message callback @@ -2557,6 +2565,7 @@ describe("Regression: cancel semantics and error channel split", () => { } as any, ], }); + await run.onEvent({ type: "llm_call_started" }); run.onEvent({ type: "usage", inputTokens: 10, @@ -2681,7 +2690,7 @@ describe("Regression: cancel semantics and error channel split", () => { conversation.enqueueMessage("msg-2", [], (e) => events2.push(e), "req-2"); // Complete the first agent loop run - resolveRun(0); + await resolveRun(0); // The turn should still complete (timeout fires) and drain the queue // even though commitTurnChanges never resolves. @@ -2702,7 +2711,7 @@ describe("Regression: cancel semantics and error channel split", () => { // Complete the second run so the test can clean up turnCommitHangForever = false; - resolveRun(1); + await resolveRun(1); await new Promise((r) => origSetTimeout(r, 10)); } finally { turnCommitHangForever = false; diff --git a/assistant/src/__tests__/conversation-slash-queue.test.ts b/assistant/src/__tests__/conversation-slash-queue.test.ts index ae319f04609..b5f7d78c586 100644 --- a/assistant/src/__tests__/conversation-slash-queue.test.ts +++ b/assistant/src/__tests__/conversation-slash-queue.test.ts @@ -136,6 +136,7 @@ mock.module("../memory/conversation-crud.js", () => ({ getMessageById: () => null, getLastUserTimestampBefore: () => 0, reserveMessage: mock(async () => ({ id: "msg-reserve" })), + updateMessageContent: mock(() => {}), })); mock.module("../memory/conversation-queries.js", () => ({ @@ -205,7 +206,7 @@ mock.module("../config/skill-state.js", () => ({ interface PendingRun { resolve: (history: Message[]) => void; messages: Message[]; - onEvent: (event: AgentEvent) => void; + onEvent: (event: AgentEvent) => void | Promise; } let pendingRuns: PendingRun[] = []; @@ -224,7 +225,7 @@ mock.module("../agent/loop.js", () => ({ } async run( messages: Message[], - onEvent: (event: AgentEvent) => void, + onEvent: (event: AgentEvent) => void | Promise, _signal?: AbortSignal, _requestId?: string, _onCheckpoint?: ( @@ -334,21 +335,24 @@ async function waitForPendingRun( } } -function resolveRun(index: number) { +async function resolveRun(index: number) { const run = pendingRuns[index]; if (!run) throw new Error(`No pending run at index ${index}`); const assistantMsg: Message = { role: "assistant", content: [{ type: "text", text: `reply-${index}` }], }; - run.onEvent({ + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await run.onEvent({ type: "llm_call_started" }); + await run.onEvent({ type: "usage", inputTokens: 10, outputTokens: 5, model: "mock", providerDurationMs: 100, }); - run.onEvent({ type: "message_complete", message: assistantMsg }); + await run.onEvent({ type: "message_complete", message: assistantMsg }); run.resolve([...run.messages, assistantMsg]); } @@ -409,7 +413,7 @@ describe("Conversation queue — slash-like messages pass through to agent loop" expect(conversation.getQueueDepth()).toBe(2); // Complete first run — drain pulls both queued messages into one batched run. - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -420,7 +424,7 @@ describe("Conversation queue — slash-like messages pass through to agent loop" // Exactly 2 runs total: msg-1 + batched [/not-a-skill, msg-3]. expect(pendingRuns.length).toBe(2); - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 50)); }); @@ -449,7 +453,7 @@ describe("Conversation queue — slash-like messages pass through to agent loop" ); // Complete first run — triggers drain - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -464,7 +468,7 @@ describe("Conversation queue — slash-like messages pass through to agent loop" // Content passes through as-is — no rewriting expect(text).toContain("/start-the-day"); - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 50)); }); @@ -495,7 +499,7 @@ describe("Conversation queue — slash-like messages pass through to agent loop" expect(conversation.getQueueDepth()).toBe(3); // Resolve msg-1 → drain pulls only "hi" (batch builder stops at /compact). - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -504,14 +508,14 @@ describe("Conversation queue — slash-like messages pass through to agent loop" // Resolve "hi" → /compact short-circuits without a new runAgentLoop, then // drains "bye" as its own run. - resolveRun(1); + await resolveRun(1); await waitForPendingRun(3); expect(eventsCompact.some((e) => e.type === "message_complete")).toBe(true); expect(eventsBye.some((e) => e.type === "message_dequeued")).toBe(true); expect(pendingRuns.length).toBe(3); - resolveRun(2); + await resolveRun(2); await new Promise((r) => setTimeout(r, 50)); }); diff --git a/assistant/src/__tests__/conversation-slash-unknown.test.ts b/assistant/src/__tests__/conversation-slash-unknown.test.ts index 098f6174810..aa4f2049be5 100644 --- a/assistant/src/__tests__/conversation-slash-unknown.test.ts +++ b/assistant/src/__tests__/conversation-slash-unknown.test.ts @@ -137,6 +137,7 @@ mock.module("../memory/conversation-crud.js", () => ({ getMessageById: () => null, getLastUserTimestampBefore: () => 0, reserveMessage: mock(async () => ({ id: "msg-reserve" })), + updateMessageContent: mock(() => {}), })); mock.module("../memory/conversation-queries.js", () => ({ @@ -255,6 +256,9 @@ mock.module("../agent/loop.js", () => ({ checkpoint: CheckpointInfo, ) => CheckpointDecision | Promise, ): Promise { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); agentLoopRunCalled = true; const assistantMsg: Message = { role: "assistant", diff --git a/assistant/src/__tests__/conversation-workspace-injection.test.ts b/assistant/src/__tests__/conversation-workspace-injection.test.ts index c1ccac68ad2..8adadae24c0 100644 --- a/assistant/src/__tests__/conversation-workspace-injection.test.ts +++ b/assistant/src/__tests__/conversation-workspace-injection.test.ts @@ -146,6 +146,7 @@ mock.module("../memory/conversation-crud.js", () => ({ getConversationOverrideProfileFromRow: () => undefined, updateMessageMetadata: () => {}, reserveMessage: mock(async () => ({ id: "msg-reserve" })), + updateMessageContent: mock(() => {}), })); mock.module("../memory/conversation-queries.js", () => ({ @@ -245,6 +246,9 @@ mock.module("../agent/loop.js", () => ({ messages: Message[], onEvent: (event: AgentEvent) => void, ): Promise { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); runCalls.push(messages); agentLoopScript(onEvent); onEvent({ diff --git a/assistant/src/__tests__/conversation-workspace-tool-tracking.test.ts b/assistant/src/__tests__/conversation-workspace-tool-tracking.test.ts index 17b01ce7f4c..7a7dafeb875 100644 --- a/assistant/src/__tests__/conversation-workspace-tool-tracking.test.ts +++ b/assistant/src/__tests__/conversation-workspace-tool-tracking.test.ts @@ -143,6 +143,7 @@ mock.module("../memory/conversation-crud.js", () => ({ getConversationOverrideProfileFromRow: () => undefined, updateMessageMetadata: () => {}, reserveMessage: mock(async () => ({ id: "msg-reserve" })), + updateMessageContent: mock(() => {}), })); mock.module("../memory/conversation-queries.js", () => ({ @@ -229,6 +230,9 @@ mock.module("../agent/loop.js", () => ({ messages: Message[], onEvent: (event: AgentEvent) => void, ): Promise { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); agentLoopScript(onEvent); onEvent({ type: "usage", diff --git a/assistant/src/__tests__/outbound-slack-persistence.test.ts b/assistant/src/__tests__/outbound-slack-persistence.test.ts index 89c25f0cdc5..20072328b74 100644 --- a/assistant/src/__tests__/outbound-slack-persistence.test.ts +++ b/assistant/src/__tests__/outbound-slack-persistence.test.ts @@ -107,11 +107,47 @@ mock.module("../memory/conversation-crud.js", () => ({ : {}; row.metadata = JSON.stringify({ ...existing, ...updates }); }, - updateMessageContent: () => {}, + updateMessageContent: (messageId: string, content: string) => { + // Mirror updateContent into the same capture array so existing + // `lastAssistantPersisted()` assertions continue to find the row that + // was reserved at `llm_call_started` time. + const row = persistedRows.find((candidate) => candidate.id === messageId); + if (row) row.content = content; + const call = addMessageCalls.find((c) => c.id === messageId); + if (call) call.content = content; + }, // The handler treats provenance as a flat spread; returning {} keeps the // metadata snapshot focused on the fields under test. provenanceFromTrustContext: () => ({}), - reserveMessage: mock(async () => ({ id: "msg-reserve" })), + reserveMessage: mock( + async ( + conversationId: string, + role: string, + metadata?: Record, + ) => { + // B3: production code creates the assistant row at `llm_call_started` + // via `reserveMessage`, stamping channel metadata at reserve time. + // Mirror that into the addMessage capture array so existing + // `lastAssistantPersisted()` assertions keep working. + const id = `mock-msg-${addMessageCalls.length + 1}-reserve`; + addMessageCalls.push({ + id, + conversationId, + role, + content: "", + metadata, + }); + persistedRows.push({ + id, + conversationId, + role, + content: "", + createdAt: Date.now(), + metadata: metadata ? JSON.stringify(metadata) : null, + }); + return { id }; + }, + ), })); mock.module("../memory/llm-request-log-store.js", () => ({ @@ -148,6 +184,7 @@ import type { } from "../daemon/conversation-agent-loop-handlers.js"; import { createEventHandlerState, + handleLlmCallStarted, handleMessageComplete, } from "../daemon/conversation-agent-loop-handlers.js"; import type { ServerMessage } from "../daemon/message-protocol.js"; @@ -256,6 +293,7 @@ describe("outbound assistant Slack metadata persistence", () => { assistantMessageChannel: "slack", requesterChatId: channelId, }); + await handleLlmCallStarted(state, deps); await handleMessageComplete(state, deps, makeMessageCompleteEvent("hi")); const persisted = lastAssistantPersisted(); @@ -297,6 +335,7 @@ describe("outbound assistant Slack metadata persistence", () => { requesterTimezoneLabel: "ET", clientTimezone: "America/Los_Angeles", }); + await handleLlmCallStarted(state, deps); await handleMessageComplete( state, deps, @@ -325,6 +364,7 @@ describe("outbound assistant Slack metadata persistence", () => { requesterChatId: channelId, clientTimezone: "America/Los_Angeles", }); + await handleLlmCallStarted(state, deps); await handleMessageComplete( state, deps, @@ -352,6 +392,7 @@ describe("outbound assistant Slack metadata persistence", () => { requesterChatId: channelId, requesterTimezoneLabel: "ET", }); + await handleLlmCallStarted(state, deps); await handleMessageComplete( state, deps, @@ -395,6 +436,7 @@ describe("outbound assistant Slack metadata persistence", () => { assistantMessageChannel: "slack", requesterChatId: channelId, }); + await handleLlmCallStarted(state, deps); await handleMessageComplete(state, deps, makeMessageCompleteEvent("hello")); const persisted = lastAssistantPersisted(); @@ -426,6 +468,7 @@ describe("outbound assistant Slack metadata persistence", () => { assistantMessageChannel: "slack", requesterChatId: channelId, }); + await handleLlmCallStarted(state, deps); await handleMessageComplete( state, deps, @@ -447,6 +490,7 @@ describe("outbound assistant Slack metadata persistence", () => { const deps = makeDeps(conversationId, { assistantMessageChannel: "vellum", }); + await handleLlmCallStarted(state, deps); await handleMessageComplete( state, deps, diff --git a/assistant/src/__tests__/persistence-secret-redaction.test.ts b/assistant/src/__tests__/persistence-secret-redaction.test.ts index ce926835754..798dfba6021 100644 --- a/assistant/src/__tests__/persistence-secret-redaction.test.ts +++ b/assistant/src/__tests__/persistence-secret-redaction.test.ts @@ -40,6 +40,7 @@ mock.module("../config/loader.js", () => ({ })); interface AddMessageCall { + id: string; conversationId: string; role: string; content: string; @@ -53,14 +54,41 @@ mock.module("../memory/conversation-crud.js", () => ({ content: string, metadata?: Record, ) => { - addMessageCalls.push({ conversationId, role, content, metadata }); - return { id: `mock-msg-${addMessageCalls.length}` }; + const id = `mock-msg-${addMessageCalls.length + 1}`; + addMessageCalls.push({ id, conversationId, role, content, metadata }); + return { id }; }, getConversation: () => null, getMessageById: () => null, - updateMessageContent: () => {}, + updateMessageContent: (messageId: string, content: string) => { + // Mirror updateContent into the same capture array so existing + // `lastPersisted("assistant")` assertions continue to find the row that + // was reserved at `llm_call_started` time. + const call = addMessageCalls.find((c) => c.id === messageId); + if (call) call.content = content; + }, provenanceFromTrustContext: () => ({}), - reserveMessage: mock(async () => ({ id: "msg-reserve" })), + reserveMessage: mock( + async ( + conversationId: string, + role: string, + metadata?: Record, + ) => { + // B3: production code creates the assistant row at `llm_call_started` + // via `reserveMessage`, stamping channel metadata at reserve time. + // Mirror that into the addMessage capture array so existing + // `lastPersisted("assistant")` assertions keep working. + const id = `mock-msg-${addMessageCalls.length + 1}-reserve`; + addMessageCalls.push({ + id, + conversationId, + role, + content: "", + metadata, + }); + return { id }; + }, + ), })); mock.module("../memory/llm-request-log-store.js", () => ({ @@ -85,6 +113,7 @@ import type { } from "../daemon/conversation-agent-loop-handlers.js"; import { createEventHandlerState, + handleLlmCallStarted, handleMessageComplete, } from "../daemon/conversation-agent-loop-handlers.js"; @@ -165,7 +194,12 @@ describe("persistence-layer secret redaction", () => { isError: false, }); - await handleMessageComplete(state, makeDeps(), makeMessageCompleteEvent("done")); + await handleLlmCallStarted(state, makeDeps()); + await handleMessageComplete( + state, + makeDeps(), + makeMessageCompleteEvent("done"), + ); const persisted = lastPersisted("user"); const blocks = JSON.parse(persisted.content) as Array<{ @@ -184,7 +218,12 @@ describe("persistence-layer secret redaction", () => { isError: false, }); - await handleMessageComplete(state, makeDeps(), makeMessageCompleteEvent("done")); + await handleLlmCallStarted(state, makeDeps()); + await handleMessageComplete( + state, + makeDeps(), + makeMessageCompleteEvent("done"), + ); const persisted = lastPersisted("user"); const blocks = JSON.parse(persisted.content) as Array<{ @@ -196,13 +235,19 @@ describe("persistence-layer secret redaction", () => { }); test("does not redact non-secret content (UUID, hex hash) in tool result", async () => { - const safe = "id=550e8400-e29b-41d4-a716-446655440000 sha=a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2"; + const safe = + "id=550e8400-e29b-41d4-a716-446655440000 sha=a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2"; state.pendingToolResults.set("tool-use-3", { content: safe, isError: false, }); - await handleMessageComplete(state, makeDeps(), makeMessageCompleteEvent("done")); + await handleLlmCallStarted(state, makeDeps()); + await handleMessageComplete( + state, + makeDeps(), + makeMessageCompleteEvent("done"), + ); const persisted = lastPersisted("user"); const blocks = JSON.parse(persisted.content) as Array<{ @@ -225,7 +270,12 @@ describe("persistence-layer secret redaction", () => { // Capture the content before handleMessageComplete clears pendingToolResults const contentSnapshot = state.pendingToolResults.get("tool-use-4")!.content; - await handleMessageComplete(state, makeDeps(), makeMessageCompleteEvent("done")); + await handleLlmCallStarted(state, makeDeps()); + await handleMessageComplete( + state, + makeDeps(), + makeMessageCompleteEvent("done"), + ); // The snapshot taken from live state before the call must be unmodified expect(contentSnapshot).toBe(originalContent); @@ -240,7 +290,12 @@ describe("persistence-layer secret redaction", () => { "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; const text = `Your API key is \`${secret}\`. Keep it safe.`; - await handleMessageComplete(state, makeDeps(), makeMessageCompleteEvent(text)); + await handleLlmCallStarted(state, makeDeps()); + await handleMessageComplete( + state, + makeDeps(), + makeMessageCompleteEvent(text), + ); const persisted = lastPersisted("assistant"); const blocks = JSON.parse(persisted.content) as Array<{ @@ -257,7 +312,12 @@ describe("persistence-layer secret redaction", () => { const secret = "sk-proj-ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrst"; const text = `I found this key in the config: ${secret}`; - await handleMessageComplete(state, makeDeps(), makeMessageCompleteEvent(text)); + await handleLlmCallStarted(state, makeDeps()); + await handleMessageComplete( + state, + makeDeps(), + makeMessageCompleteEvent(text), + ); const persisted = lastPersisted("assistant"); const blocks = JSON.parse(persisted.content) as Array<{ @@ -272,7 +332,12 @@ describe("persistence-layer secret redaction", () => { test("does not redact non-secret text in assistant message", async () => { const safe = "Here is the file list: index.ts, util.ts, main.ts"; - await handleMessageComplete(state, makeDeps(), makeMessageCompleteEvent(safe)); + await handleLlmCallStarted(state, makeDeps()); + await handleMessageComplete( + state, + makeDeps(), + makeMessageCompleteEvent(safe), + ); const persisted = lastPersisted("assistant"); const blocks = JSON.parse(persisted.content) as Array<{ @@ -287,7 +352,12 @@ describe("persistence-layer secret redaction", () => { // High-entropy but no known credential prefix — should NOT be redacted const text = "checksum: 8f14e45fceea167a5a36dedd4bea2543"; - await handleMessageComplete(state, makeDeps(), makeMessageCompleteEvent(text)); + await handleLlmCallStarted(state, makeDeps()); + await handleMessageComplete( + state, + makeDeps(), + makeMessageCompleteEvent(text), + ); const persisted = lastPersisted("assistant"); const blocks = JSON.parse(persisted.content) as Array<{ diff --git a/assistant/src/agent/loop.ts b/assistant/src/agent/loop.ts index dd4dc42928c..981c5389da1 100644 --- a/assistant/src/agent/loop.ts +++ b/assistant/src/agent/loop.ts @@ -119,6 +119,15 @@ export type AgentLoopExitReason = | "error"; export type AgentEvent = + /** + * Emitted once per LLM call inside the loop, immediately before the + * `provider.sendMessage` invocation. Carries the optional `callSite` tag so + * downstream handlers (the daemon's persistence pipeline) can decide + * whether to reserve a row for this call. One `llm_call_started` precedes + * every `message_complete` for the same call; multi-call agent turns emit + * one pair per call. + */ + | { type: "llm_call_started"; callSite?: LLMCallSite } | { type: "text_delta"; text: string } | { type: "thinking_delta"; thinking: string } | { type: "message_complete"; message: Message } @@ -651,6 +660,15 @@ export class AgentLoop { ); rlog.info({ turn: toolUseTurns }, "LLM call start"); + // Announce the LLM-call boundary so downstream handlers (the + // daemon's persistence pipeline) can reserve an empty assistant row + // and stamp the resulting `messageId` onto every streaming event the + // call emits. Awaited so the row is created and the + // `assistant_turn_start` wire event reaches the client BEFORE the + // provider starts streaming deltas — the deltas downstream will + // carry the freshly-reserved id. + await onEvent({ type: "llm_call_started", callSite }); + // Strip image contentBlocks from older tool results to prevent // screenshots from accumulating in the context window. The LLM // already saw each image on the turn it was captured; keeping diff --git a/assistant/src/daemon/conversation-agent-loop-handlers.ts b/assistant/src/daemon/conversation-agent-loop-handlers.ts index a43b9f4ae17..1c0e3f8682b 100644 --- a/assistant/src/daemon/conversation-agent-loop-handlers.ts +++ b/assistant/src/daemon/conversation-agent-loop-handlers.ts @@ -40,8 +40,8 @@ import { defaultPersistenceTerminal } from "../plugins/defaults/persistence.js"; import { DEFAULT_TIMEOUTS, runPipeline } from "../plugins/pipeline.js"; import { getMiddlewaresFor } from "../plugins/registry.js"; import type { - PersistAddResult, PersistArgs, + PersistReserveResult, PersistResult, TurnContext, } from "../plugins/types.js"; @@ -389,6 +389,105 @@ function resolveAssistantReplyTimestampTimezone( }).effectiveTimezone; } +/** + * Assemble the metadata envelope written to the assistant message row. + * + * Stamped at reserve time (before `provider.sendMessage`) so the row carries + * channel provenance from the moment it lands in SQLite, mirroring the + * snapshot that handleMessageComplete used to compute at end-of-turn. All + * inputs (channel context, trust context, turnStartedAt) are stable across + * the LLM call, so building this once at reserve is equivalent to building + * it at complete. Slack reply rows further stamp a `slackMeta` sub-object — + * the `channelTs` field stays absent here and is back-filled by + * `deliverReplyViaCallback` after the gateway returns the ts. + */ +function buildAssistantChannelMetadata( + state: EventHandlerState, + deps: EventHandlerDeps, +): Record { + const metadata: Record = { + ...provenanceFromTrustContext(deps.ctx.trustContext), + userMessageChannel: deps.turnChannelContext.userMessageChannel, + assistantMessageChannel: deps.turnChannelContext.assistantMessageChannel, + userMessageInterface: deps.turnInterfaceContext.userMessageInterface, + assistantMessageInterface: + deps.turnInterfaceContext.assistantMessageInterface, + sentAt: state.turnStartedAt, + }; + + if (deps.turnChannelContext.assistantMessageChannel === "slack") { + const channelId = deps.ctx.trustContext?.requesterChatId; + if (channelId) { + const threadTs = getThreadTs(deps.ctx.conversationId); + const timestampTimezone = resolveAssistantReplyTimestampTimezone( + deps.ctx, + ); + const timestampTimezoneLabel = formatSlackTimezoneLabel( + timestampTimezone, + { nowMs: state.turnStartedAt }, + ); + const partialSlackMeta: Partial = { + source: "slack", + eventKind: "message", + channelId, + ...(threadTs ? { threadTs } : {}), + timestampTimezone, + ...(timestampTimezoneLabel ? { timestampTimezoneLabel } : {}), + }; + // `channelTs` is filled in by the post-send reconciliation step in + // `deliverReplyViaCallback`; cast through the Partial to satisfy + // the writer's type at this pre-send boundary. + metadata.slackMeta = writeSlackMetadata( + partialSlackMeta as SlackMessageMetadata, + ); + } + } + + return metadata; +} + +/** + * Reserve an empty assistant row for the LLM call about to begin, stash + * its id on `state.lastAssistantMessageId`, and announce the boundary on + * the wire via `assistant_turn_start`. + * + * Awaited so the row exists and the client has the anchor id BEFORE any + * streaming delta arrives — every subsequent `deps.onEvent` in this LLM + * call stamps `messageId: state.lastAssistantMessageId`, and + * `handleMessageComplete` flushes the final content to the same row via + * `op: "updateContent"` instead of inserting a fresh one. + * + * Multi-LLM-call agent turns (LLM call → tool execution → LLM call) emit + * one `llm_call_started` per call, so each LLM call reserves its own row. + * The read-path `findDisplayTurnEndIndex` collapses consecutive assistant + * rows for the merged history view, matching today's per-call DB layout. + */ +export async function handleLlmCallStarted( + state: EventHandlerState, + deps: EventHandlerDeps, +): Promise { + const metadata = buildAssistantChannelMetadata(state, deps); + const reserveResult = (await runPipeline( + "persistence", + getMiddlewaresFor("persistence"), + defaultPersistenceTerminal, + { + op: "reserve", + conversationId: deps.ctx.conversationId, + role: "assistant", + metadata, + }, + buildHandlerTurnContext(deps), + DEFAULT_TIMEOUTS.persistence, + )) as PersistReserveResult; + state.lastAssistantMessageId = reserveResult.message.id; + deps.onEvent({ + type: "assistant_turn_start", + messageId: reserveResult.message.id, + conversationId: deps.ctx.conversationId, + }); +} + // ── Individual Handlers ────────────────────────────────────────────── function handleTextDelta( @@ -417,6 +516,7 @@ function handleTextDelta( type: "assistant_text_delta", text: drained.emitText, conversationId: deps.ctx.conversationId, + messageId: state.lastAssistantMessageId, }); if (deps.shouldGenerateTitle) state.firstAssistantText += drained.emitText; } @@ -454,6 +554,7 @@ function handleThinkingDelta( type: "assistant_thinking_delta", thinking: event.thinking, conversationId: deps.ctx.conversationId, + messageId: state.lastAssistantMessageId, }); } @@ -484,11 +585,12 @@ export function handleToolUse( input: event.input, conversationId: deps.ctx.conversationId, toolUseId: event.id, + messageId: state.lastAssistantMessageId, }); } export function handleToolUsePreviewStart( - _state: EventHandlerState, + state: EventHandlerState, deps: EventHandlerDeps, event: Extract, ): void { @@ -497,6 +599,7 @@ export function handleToolUsePreviewStart( toolUseId: event.toolUseId, toolName: event.toolName, conversationId: deps.ctx.conversationId, + messageId: state.lastAssistantMessageId, }); const statusText = `Preparing ${friendlyToolName(event.toolName)}...`; deps.ctx.emitActivityState( @@ -509,7 +612,7 @@ export function handleToolUsePreviewStart( } function handleToolOutputChunk( - _state: EventHandlerState, + state: EventHandlerState, deps: EventHandlerDeps, event: Extract, ): void { @@ -567,6 +670,7 @@ function handleToolOutputChunk( chunk: event.chunk, conversationId: deps.ctx.conversationId, toolUseId: event.toolUseId, + messageId: state.lastAssistantMessageId, subType: structured.subType, subToolName: structured.subToolName, subToolInput: structured.subToolInput, @@ -579,12 +683,13 @@ function handleToolOutputChunk( chunk: event.chunk, conversationId: deps.ctx.conversationId, toolUseId: event.toolUseId, + messageId: state.lastAssistantMessageId, }); } } export function handleInputJsonDelta( - _state: EventHandlerState, + state: EventHandlerState, deps: EventHandlerDeps, event: Extract, ): void { @@ -598,6 +703,7 @@ export function handleInputJsonDelta( content: event.accumulatedJson, conversationId: deps.ctx.conversationId, toolUseId: event.toolUseId, + messageId: state.lastAssistantMessageId, }); } @@ -724,6 +830,7 @@ export function handleToolResult( diff: event.diff, status: event.status, conversationId: deps.ctx.conversationId, + messageId: state.lastAssistantMessageId, imageData: imageDataList?.[0], imageDataList, toolUseId: event.toolUseId, @@ -917,6 +1024,7 @@ export async function handleMessageComplete( type: "assistant_text_delta", text: state.pendingDirectiveDisplayBuffer, conversationId: deps.ctx.conversationId, + messageId: state.lastAssistantMessageId, }); if (deps.shouldGenerateTitle) state.firstAssistantText += state.pendingDirectiveDisplayBuffer; @@ -1023,52 +1131,6 @@ export async function handleMessageComplete( } as unknown as ContentBlock); } - const assistantChannelMetadata: Record = { - ...provenanceFromTrustContext(deps.ctx.trustContext), - userMessageChannel: deps.turnChannelContext.userMessageChannel, - assistantMessageChannel: deps.turnChannelContext.assistantMessageChannel, - userMessageInterface: deps.turnInterfaceContext.userMessageInterface, - assistantMessageInterface: - deps.turnInterfaceContext.assistantMessageInterface, - sentAt: state.turnStartedAt, - }; - - // When the assistant is replying through Slack, stamp a `slackMeta` - // sub-object so the transcript-rendering / thread-aware-context lookup - // can identify this row's thread without joining tables. - // Persistence happens BEFORE the Slack adapter sends the message, so - // Slack's authoritative `ts` (-> `channelTs`) is not yet known and is - // intentionally omitted here. The post-send reconciliation step in - // `deliverReplyViaCallback` writes `channelTs` back into this row once - // the gateway returns the Slack-assigned ts, restoring a fully-formed - // metadata envelope before any subsequent turn reads the row. - if (deps.turnChannelContext.assistantMessageChannel === "slack") { - const channelId = deps.ctx.trustContext?.requesterChatId; - if (channelId) { - const threadTs = getThreadTs(deps.ctx.conversationId); - const timestampTimezone = resolveAssistantReplyTimestampTimezone( - deps.ctx, - ); - const timestampTimezoneLabel = formatSlackTimezoneLabel( - timestampTimezone, - { nowMs: state.turnStartedAt }, - ); - const partialSlackMeta: Partial = { - source: "slack", - eventKind: "message", - channelId, - ...(threadTs ? { threadTs } : {}), - timestampTimezone, - ...(timestampTimezoneLabel ? { timestampTimezoneLabel } : {}), - }; - assistantChannelMetadata.slackMeta = writeSlackMetadata( - // `channelTs` is filled in by the post-send reconciliation step in - // `deliverReplyViaCallback`; cast through the Partial to satisfy - // the writer's type at this pre-send boundary. - partialSlackMeta as SlackMessageMetadata, - ); - } - } // Redact known-pattern secrets from assistant text blocks before they are // written to durable storage. Non-text blocks (images, UI surfaces) pass // through unchanged. The live model history retains the original values. @@ -1080,32 +1142,39 @@ export async function handleMessageComplete( return block; }); - // Route the assistant-message persistence through the `persistence` - // pipeline. No `syncToDisk` here — the orchestrator separately invokes - // `syncMessageToDisk` on `state.lastAssistantMessageId` after the loop - // completes (see `conversation-agent-loop.ts::syncLastAssistantMessageToDisk`). - const assistantPersistResult = (await runPipeline( + // The row was reserved at `llm_call_started` (with channel metadata + // stamped at that point) and `state.lastAssistantMessageId` carries its + // id. Flush the final content via `updateContent` instead of inserting a + // new row. No `syncToDisk` flag here — the orchestrator separately + // invokes `syncMessageToDisk` on `state.lastAssistantMessageId` after + // the loop completes (see + // `conversation-agent-loop.ts::syncLastAssistantMessageToDisk`). + const assistantMessageId = state.lastAssistantMessageId; + if (!assistantMessageId) { + throw new Error( + "handleMessageComplete fired without a prior llm_call_started reserving an assistant row", + ); + } + await runPipeline( "persistence", getMiddlewaresFor("persistence"), defaultPersistenceTerminal, { - op: "add", - conversationId: deps.ctx.conversationId, - role: "assistant", + op: "updateContent", + messageId: assistantMessageId, content: JSON.stringify(contentForPersistence), - metadata: assistantChannelMetadata, }, buildHandlerTurnContext(deps), DEFAULT_TIMEOUTS.persistence, - )) as PersistAddResult; - const assistantMsg = assistantPersistResult.message; - state.lastAssistantMessageId = assistantMsg.id; + ); // Backfill message_id on all LLM request logs from this turn. // The agent loop is single-threaded per conversation, so all rows with - // message_id IS NULL belong to the current turn. + // message_id IS NULL belong to the current turn. The reserved id was + // available before the LLM call ran but the logs are inserted DURING + // the call, so the sweep still runs here. try { - backfillMessageIdOnLogs(deps.ctx.conversationId, assistantMsg.id); + backfillMessageIdOnLogs(deps.ctx.conversationId, assistantMessageId); } catch (err) { deps.rlog.warn( { err }, @@ -1114,7 +1183,10 @@ export async function handleMessageComplete( } try { - backfillMemoryRecallLogMessageId(deps.ctx.conversationId, assistantMsg.id); + backfillMemoryRecallLogMessageId( + deps.ctx.conversationId, + assistantMessageId, + ); } catch (err) { deps.rlog.warn( { err }, @@ -1125,7 +1197,7 @@ export async function handleMessageComplete( try { backfillMemoryV2ActivationMessageId( deps.ctx.conversationId, - assistantMsg.id, + assistantMessageId, ); } catch (err) { deps.rlog.warn( @@ -1295,6 +1367,9 @@ export async function dispatchAgentEvent( ): Promise { try { switch (event.type) { + case "llm_call_started": + await handleLlmCallStarted(state, deps); + break; case "text_delta": handleTextDelta(state, deps, event); break; @@ -1335,6 +1410,7 @@ export async function dispatchAgentEvent( input: event.input, conversationId: deps.ctx.conversationId, toolUseId: event.toolUseId, + messageId: state.lastAssistantMessageId, }); break; } @@ -1414,6 +1490,7 @@ export async function dispatchAgentEvent( isError: event.isError, conversationId: deps.ctx.conversationId, toolUseId: event.toolUseId, + messageId: state.lastAssistantMessageId, ...(metadata ? { activityMetadata: { webSearch: metadata } } : {}), }); break; diff --git a/assistant/src/daemon/wake-target-adapter.ts b/assistant/src/daemon/wake-target-adapter.ts index af17c056a11..666a2e68a6e 100644 --- a/assistant/src/daemon/wake-target-adapter.ts +++ b/assistant/src/daemon/wake-target-adapter.ts @@ -138,6 +138,15 @@ function translateAgentEventToServerMessage( case "provider_error": case "agent_loop_exit": return null; + case "llm_call_started": + // The wake path persists its assistant tail via `persistTailMessage` + // (an `addMessage`-shaped call below) rather than via the main + // event-handler's `reserve` → `updateContent` pipeline, so there is + // no row to reserve here. Translation returns null and the wake + // path's existing end-of-turn persist continues to mint the row. + // Following up with full wake-path pre-allocation parity is tracked + // as a B3 follow-up. + return null; } } From 8d9c6fe78f3d1ee876b6ea9ed3abbcff346121bf Mon Sep 17 00:00:00 2001 From: "vellum-apollo-bot[bot]" <242025090+vellum-apollo-bot[bot]@users.noreply.github.com> Date: Thu, 28 May 2026 14:05:26 +0000 Subject: [PATCH 2/3] B3 review iteration: restore indexing, cleanup paths, llm_call_started anchor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses three review threads from PR #32326 round 1: Codex P1 (indexer/projector drop-out): handleMessageComplete now re-drives indexMessageNow + projectAssistantMessage after updateContent succeeds. The pre-B3 addMessage path ran both as side-effects of insert; reserveMessage + updateMessageContent are CRUD-only so the calls had to come back explicitly. Projector signal still gates a publishSyncInvalidation on the conversation :metadata tag (attention state change only — no flood on no-op turns). Both calls are wrapped in their own try/catch — memory hiccups remain non-fatal to the turn. Codex P2 (synthetic provider-error orphan): Provider-error branch in conversation-agent-loop.ts now deletes the stranded reserved assistant row before inserting the synthetic error message, then repoints state.lastAssistantMessageId at the error row's id so syncLastAssistantMessageToDisk targets a real persisted message. New invariant: state.assistantRowAwaitingFinalization: boolean joins the existing throw guard. Set true in handleLlmCallStarted after reserve, cleared in handleMessageComplete after the successful updateContent. handleLlmCallStarted also uses it to clean up a stranded reservation from a previous LLM call in this run (retry paths — overflow, ordering, image overflow). Reviewer thread (llm_call_started placement): moved emit closer to provider.sendMessage in agent/loop.ts so the reserve race window stays as tight as possible. Tests (conversation-agent-loop.test.ts, new describe block): 4 new tests covering both indexer/projector paths, the strand cleanup, and the provider-error orphan branch. 64/64 pass, typecheck clean. --- .../__tests__/conversation-agent-loop.test.ts | 321 +++++++++++++++++- assistant/src/agent/loop.ts | 21 +- .../conversation-agent-loop-handlers.ts | 136 +++++++- .../src/daemon/conversation-agent-loop.ts | 38 +++ 4 files changed, 503 insertions(+), 13 deletions(-) diff --git a/assistant/src/__tests__/conversation-agent-loop.test.ts b/assistant/src/__tests__/conversation-agent-loop.test.ts index 0ae4605bfdf..b5cff4feae6 100644 --- a/assistant/src/__tests__/conversation-agent-loop.test.ts +++ b/assistant/src/__tests__/conversation-agent-loop.test.ts @@ -176,6 +176,12 @@ let mockConversationRow: Record = { title: null, }; let mockMessageById: Record | null = null; +const deleteMessageByIdMock = mock(() => ({ + segmentIds: [], + deletedSummaryIds: [], +})); +const reserveMessageMock = mock(async () => ({ id: "msg-reserve" })); +const updateMessageContentMock = mock(() => {}); mock.module("../memory/conversation-crud.js", () => ({ setConversationOriginChannelIfUnset: () => {}, updateConversationUsage: () => {}, @@ -189,7 +195,7 @@ mock.module("../memory/conversation-crud.js", () => ({ }), getConversationOriginInterface: () => null, addMessage: () => ({ id: "mock-msg-id" }), - deleteMessageById: () => {}, + deleteMessageById: deleteMessageByIdMock, updateConversationContextWindow: () => {}, updateConversationSlackContextWatermark: updateConversationSlackContextWatermarkMock, @@ -197,8 +203,36 @@ mock.module("../memory/conversation-crud.js", () => ({ getConversationOriginChannel: () => null, getMessageById: () => mockMessageById, getLastUserTimestampBefore: () => 0, - reserveMessage: mock(async () => ({ id: "msg-reserve" })), - updateMessageContent: mock(() => {}), + reserveMessage: reserveMessageMock, + updateMessageContent: updateMessageContentMock, + // The real schema is a Zod object; tests don't exercise validation, + // so a passthrough is sufficient — the production code at + // `handleMessageComplete` only branches on `success` and reads two + // fields off `data`. `safeParse` of an empty object satisfies the + // schema (every field is optional). + messageMetadataSchema: { + safeParse: (input: unknown) => ({ success: true, data: input ?? {} }), + }, +})); + +// The B3 indexing-restoration path imports `indexMessageNow` from +// `../memory/indexer.js` and `projectAssistantMessage` from +// `../memory/conversation-attention-store.js`; without these stubs the +// real modules would try to open a SQLite DB and read a real config. +const indexMessageNowMock = mock(async () => ({ + indexedSegments: 0, + enqueuedJobs: 0, +})); +const projectAssistantMessageMock = mock(() => false); +const publishSyncInvalidationMock = mock(async () => {}); +mock.module("../memory/indexer.js", () => ({ + indexMessageNow: indexMessageNowMock, +})); +mock.module("../memory/conversation-attention-store.js", () => ({ + projectAssistantMessage: projectAssistantMessageMock, +})); +mock.module("../runtime/sync/sync-publisher.js", () => ({ + publishSyncInvalidation: publishSyncInvalidationMock, })); afterAll(() => { @@ -694,6 +728,13 @@ beforeEach(() => { mockSlackChronologicalContext = null; loadSlackChronologicalContextMock.mockClear(); getSlackCompactionWatermarkForPrefixMock.mockClear(); + deleteMessageByIdMock.mockClear(); + reserveMessageMock.mockClear(); + updateMessageContentMock.mockClear(); + indexMessageNowMock.mockClear(); + projectAssistantMessageMock.mockClear(); + publishSyncInvalidationMock.mockClear(); + mockMessageById = null; // Orchestrator pipelines (overflowReduce, persistence, …) run through the // plugin registry; reset and re-register every default so the pipelines // dispatch to middleware backed by the mocked collaborators these tests @@ -3129,6 +3170,280 @@ describe("session-agent-loop", () => { }); }); + describe("B3 pre-allocation: indexing + cleanup", () => { + test("handleMessageComplete indexes and projects the finalized assistant row", async () => { + // The pre-B3 path inserted assistant rows via `addMessage`, which ran + // the memory indexer and the conversation-attention projector as + // side-effects of the insert. B3 splits the write into + // `reserveMessage` + `updateMessageContent`, both of which are CRUD-only, + // so the indexing + projection calls had to be re-driven explicitly + // after `updateContent` succeeds. Codex P1 caught a regression where + // this path was missing entirely; this test pins it down. + mockMessageById = { + id: "msg-reserve", + conversationId: "test-conv", + createdAt: 1234567, + role: "assistant", + content: "[]", + metadata: null, + }; + // Force attention projection to report a state change so we also + // observe the sync-invalidation publish path on the same turn. + projectAssistantMessageMock.mockImplementationOnce(() => true); + + const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + await onEvent({ type: "llm_call_started" }); + // `message_complete` is awaited so `handleMessageComplete` (and its + // async indexer + projector chain) completes before the next event + // or before the loop returns. Without the await the projector's + // synchronous call still races against the test's assertion phase + // because the indexer's `await` yields microtasks. + await onEvent({ + type: "message_complete", + message: { + role: "assistant", + content: [{ type: "text", text: "indexed reply" }], + }, + }); + onEvent({ + type: "usage", + inputTokens: 10, + outputTokens: 5, + model: "test", + providerDurationMs: 50, + }); + return [ + ...messages, + { + role: "assistant" as const, + content: [ + { type: "text", text: "indexed reply" }, + ] as ContentBlock[], + }, + ]; + }; + + const ctx = makeCtx({ agentLoopRun }); + await runAgentLoopImpl(ctx, "hi", "msg-1", () => {}); + + // Indexer fired with the reserved row's id + the finalized content. + expect(indexMessageNowMock).toHaveBeenCalledTimes(1); + const indexCallArgs = indexMessageNowMock.mock.calls[0] as unknown as [ + { + messageId: string; + conversationId: string; + role: string; + content: string; + createdAt: number; + scopeId: string; + }, + unknown, + ]; + const indexCall = indexCallArgs[0]; + expect(indexCall).toMatchObject({ + messageId: "msg-reserve", + conversationId: "test-conv", + role: "assistant", + createdAt: 1234567, + scopeId: "default", + }); + expect(indexCall.content).toContain("indexed reply"); + + // Attention projector fired with the same row coordinates. + expect(projectAssistantMessageMock).toHaveBeenCalledTimes(1); + const projectCall = projectAssistantMessageMock.mock + .calls[0] as unknown as [ + { conversationId: string; messageId: string; messageAt: number }, + ]; + expect(projectCall[0]).toEqual({ + conversationId: "test-conv", + messageId: "msg-reserve", + messageAt: 1234567, + }); + + // Projection reported a state change → sync invalidation fires with + // the conversation `:metadata` tag. The mock also receives a + // `:messages` invalidation from the orchestrator's + // `publishLoopMessagesChanged` post-loop emit, so we filter by tag + // rather than asserting a total call count. + const metadataPublishes = ( + publishSyncInvalidationMock.mock.calls as unknown as Array<[string[]]> + ).filter((args) => args[0]?.includes("conversation:test-conv:metadata")); + expect(metadataPublishes).toHaveLength(1); + }); + + test("handleMessageComplete skips sync invalidation when attention state unchanged", async () => { + // Mirror of the previous test but with the default projector return + // (`false`). The projection still runs every turn, but the sync + // invalidation publish must be gated on attention-state movement to + // avoid flooding clients with no-op metadata refreshes. + mockMessageById = { + id: "msg-reserve", + conversationId: "test-conv", + createdAt: 999, + role: "assistant", + content: "[]", + metadata: null, + }; + + const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + await onEvent({ type: "llm_call_started" }); + // See sibling test — `message_complete` must be awaited so the + // projector call lands before the assertion phase. + await onEvent({ + type: "message_complete", + message: { + role: "assistant", + content: [{ type: "text", text: "quiet" }], + }, + }); + onEvent({ + type: "usage", + inputTokens: 1, + outputTokens: 1, + model: "test", + providerDurationMs: 1, + }); + return [ + ...messages, + { + role: "assistant" as const, + content: [{ type: "text", text: "quiet" }] as ContentBlock[], + }, + ]; + }; + + const ctx = makeCtx({ agentLoopRun }); + await runAgentLoopImpl(ctx, "hi", "msg-1", () => {}); + + expect(projectAssistantMessageMock).toHaveBeenCalledTimes(1); + // The mock will still receive a `:messages` invalidation from the + // orchestrator's `publishLoopMessagesChanged` — filter to the + // `:metadata` tag and assert it never landed. + const metadataPublishes = ( + publishSyncInvalidationMock.mock.calls as unknown as Array<[string[]]> + ).filter((args) => args[0]?.includes("conversation:test-conv:metadata")); + expect(metadataPublishes).toHaveLength(0); + }); + + test("handleLlmCallStarted deletes a stranded reservation before reserving a new row", async () => { + // Simulates a retry path: the first LLM call reserves an assistant row + // but exits without `message_complete` (e.g. context-overflow rescue, + // ordering-error rescue, image-overflow rescue). The next + // `llm_call_started` must delete the stranded row so the transcript + // does not accumulate empty assistant bubbles. + reserveMessageMock + .mockImplementationOnce(async () => ({ id: "msg-strand-A" })) + .mockImplementationOnce(async () => ({ id: "msg-strand-B" })); + // Indexer/projector mocks default to no-op; no finalized row in this + // test, so `mockMessageById` stays null. + + const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // First LLM call: reserve msg-strand-A, never finalize. + await onEvent({ type: "llm_call_started" }); + // Second LLM call: should delete msg-strand-A before reserving + // msg-strand-B. + await onEvent({ type: "llm_call_started" }); + // Finalize the second one so the loop has a valid assistant message + // and exits cleanly. + onEvent({ + type: "message_complete", + message: { + role: "assistant", + content: [{ type: "text", text: "retry succeeded" }], + }, + }); + onEvent({ + type: "usage", + inputTokens: 5, + outputTokens: 3, + model: "test", + providerDurationMs: 25, + }); + return [ + ...messages, + { + role: "assistant" as const, + content: [ + { type: "text", text: "retry succeeded" }, + ] as ContentBlock[], + }, + ]; + }; + + const ctx = makeCtx({ agentLoopRun }); + await runAgentLoopImpl(ctx, "hi", "msg-1", () => {}); + + // Exactly one delete fires — for msg-strand-A, before the second + // reserve. The second reservation is committed via `updateContent` + // (not deleted), and after the run completes + // `assistantRowAwaitingFinalization` is false, so no further delete + // is attempted on shutdown. + expect(deleteMessageByIdMock).toHaveBeenCalledTimes(1); + const strandDeleteCall = deleteMessageByIdMock.mock + .calls[0] as unknown as [string]; + expect(strandDeleteCall[0]).toBe("msg-strand-A"); + expect(reserveMessageMock).toHaveBeenCalledTimes(2); + }); + + test("provider-error branch deletes the orphaned reservation and repoints lastAssistantMessageId", async () => { + // Codex P2 regression: B3 reserves an empty assistant row at + // `llm_call_started`. When the call exits via the provider-error + // branch (no `message_complete`), the synthetic error message is + // inserted separately. Without cleanup the transcript would carry + // both the empty reserved row AND the error message, and + // `syncLastAssistantMessageToDisk` (which reads + // `state.lastAssistantMessageId`) would mis-target the deleted + // reservation id. + reserveMessageMock.mockImplementationOnce(async () => ({ + id: "msg-orphaned-reservation", + })); + + const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Reserve the orphan. + await onEvent({ type: "llm_call_started" }); + // Provider rejects — writes the llm_request_log row and arms + // `state.providerErrorUserMessage` via `handleError`. + onEvent({ + type: "provider_error", + error: new Error("upstream 500"), + rawRequest: { model: "gpt-4.1", messages: [] }, + actualProvider: "openai", + }); + onEvent({ + type: "error", + error: new Error("upstream 500"), + }); + // No assistant message in the result — the synthetic-error branch + // below the agent loop fires. + return messages; + }; + + const ctx = makeCtx({ agentLoopRun }); + await runAgentLoopImpl(ctx, "hi", "msg-1", () => {}); + + // The orphan was deleted exactly once, before the synthetic error + // message landed. + expect(deleteMessageByIdMock).toHaveBeenCalledTimes(1); + const deleteCall = deleteMessageByIdMock.mock.calls[0] as unknown as [ + string, + ]; + expect(deleteCall[0]).toBe("msg-orphaned-reservation"); + + // Post-loop `syncLastAssistantMessageToDisk` targets the synthetic + // error row's id (`mock-msg-id` from the mocked `addMessage`), NOT + // the deleted reservation id. This is the externally-observable + // proof that `state.lastAssistantMessageId` was repointed. + expect(syncMessageToDiskMock).toHaveBeenCalled(); + const syncCalls = syncMessageToDiskMock.mock.calls as unknown as Array< + [string, string, number] + >; + const lastSync = syncCalls[syncCalls.length - 1]; + expect(lastSync?.[1]).toBe("mock-msg-id"); + expect(lastSync?.[1]).not.toBe("msg-orphaned-reservation"); + }); + }); + describe("pkbSystemReminderBlock metadata persistence", () => { test("persists pkbSystemReminderBlock in full mode with PKB active", async () => { const reminder = "\npkb content\n"; diff --git a/assistant/src/agent/loop.ts b/assistant/src/agent/loop.ts index 981c5389da1..bc358113e79 100644 --- a/assistant/src/agent/loop.ts +++ b/assistant/src/agent/loop.ts @@ -660,15 +660,6 @@ export class AgentLoop { ); rlog.info({ turn: toolUseTurns }, "LLM call start"); - // Announce the LLM-call boundary so downstream handlers (the - // daemon's persistence pipeline) can reserve an empty assistant row - // and stamp the resulting `messageId` onto every streaming event the - // call emits. Awaited so the row is created and the - // `assistant_turn_start` wire event reaches the client BEFORE the - // provider starts streaming deltas — the deltas downstream will - // carry the freshly-reserved id. - await onEvent({ type: "llm_call_started", callSite }); - // Strip image contentBlocks from older tool results to prevent // screenshots from accumulating in the context window. The LLM // already saw each image on the turn it was captured; keeping @@ -769,6 +760,18 @@ export class AgentLoop { toolUseTurns, ); + // Announce the LLM-call boundary so downstream handlers (the + // daemon's persistence pipeline) can reserve an empty assistant row + // and stamp the resulting `messageId` onto every streaming event the + // call emits. Emit as late as possible — after history stripping, + // arg construction, and turn-context resolution — so the gap + // between "we said the call started" and the actual provider HTTP + // call is minimized. Awaited so the row is created and the + // `assistant_turn_start` wire event reaches the client BEFORE the + // provider starts streaming deltas — the deltas downstream will + // carry the freshly-reserved id. + await onEvent({ type: "llm_call_started", callSite }); + // Inner try/catch narrows error-recording scope to the provider // call itself. The outer agent-loop catch (below) wraps the entire // turn body (tool execution, plugin pipelines, checkpoints), so diff --git a/assistant/src/daemon/conversation-agent-loop-handlers.ts b/assistant/src/daemon/conversation-agent-loop-handlers.ts index 1c0e3f8682b..be07c6e09d2 100644 --- a/assistant/src/daemon/conversation-agent-loop-handlers.ts +++ b/assistant/src/daemon/conversation-agent-loop-handlers.ts @@ -16,12 +16,16 @@ import type { import { getConfig } from "../config/loader.js"; import { recordEstimate } from "../context/estimator-calibration.js"; import { getCalibrationProviderKey } from "../context/token-estimator.js"; +import { projectAssistantMessage } from "../memory/conversation-attention-store.js"; import { + deleteMessageById, getConversation, getMessageById, + messageMetadataSchema, provenanceFromTrustContext, updateMessageContent, } from "../memory/conversation-crud.js"; +import { indexMessageNow } from "../memory/indexer.js"; import { backfillMessageIdOnLogs, buildProviderErrorResponsePayload, @@ -47,6 +51,7 @@ import type { } from "../plugins/types.js"; import type { ContentBlock, ImageContent } from "../providers/types.js"; import { isContextOverflowError } from "../providers/types.js"; +import { publishSyncInvalidation } from "../runtime/sync/sync-publisher.js"; import { redactSecrets } from "../security/secret-scanner.js"; import { extractDomain } from "../tools/network/domain-normalize.js"; import { ProviderError } from "../util/errors.js"; @@ -66,6 +71,7 @@ import { import { isProviderOrderingError } from "./conversation-slash.js"; import { resolveTurnTimezoneContext } from "./date-context.js"; import type { ServerMessage } from "./message-protocol.js"; +import { conversationMetadataSyncTag } from "./message-types/sync.js"; import type { WebSearchMetadata, WebSearchResultItem, @@ -145,6 +151,21 @@ export interface EventHandlerState { contextTooLargeError: unknown; providerErrorUserMessage: string | null; lastAssistantMessageId: string | undefined; + /** + * True when `handleLlmCallStarted` has reserved an empty assistant row + * that has NOT yet been finalized via `handleMessageComplete` + * (`op:"updateContent"` + indexing + projection). Used by error/retry + * paths to detect a stranded reservation that must be cleaned up + * before the next LLM call reserves a fresh row — without it, every + * retryable failure (overflow, ordering, image overflow) and every + * terminal provider rejection would leak an empty assistant bubble + * into the transcript and mispoint downstream sync/projection. + * + * Cleared by `handleMessageComplete` on successful finalize, and by + * the synthetic-error branch in `conversation-agent-loop.ts` after it + * absorbs the reserved row into the error message. + */ + assistantRowAwaitingFinalization: boolean; readonly pendingToolResults: Map; readonly persistedToolUseIds: Set; readonly accumulatedDirectives: DirectiveRequest[]; @@ -245,6 +266,7 @@ export function createEventHandlerState(): EventHandlerState { contextTooLargeError: null, providerErrorUserMessage: null, lastAssistantMessageId: undefined, + assistantRowAwaitingFinalization: false, pendingToolResults: new Map(), persistedToolUseIds: new Set(), accumulatedDirectives: [], @@ -466,6 +488,33 @@ export async function handleLlmCallStarted( state: EventHandlerState, deps: EventHandlerDeps, ): Promise { + // Clean up an orphaned reservation from a previous LLM call in this run + // that errored before `message_complete` could finalize it. This covers + // the retryable paths (overflow, ordering, image overflow) where the + // agent loop re-enters with a fresh `run()` and reserves another row; + // without this delete the failed-attempt row stays in the transcript as + // an empty assistant bubble. The finalized-row case is filtered out via + // the `assistantRowAwaitingFinalization` flag — `handleMessageComplete` + // clears it after the successful `updateContent`, so the previous call's + // committed row is never touched here. + // + // Direct `deleteMessageById` (not via the `persistence` pipeline) is + // intentional: a never-finalized reservation has no segments, no + // attachments, and no observable history — undoing it isn't a real + // persistence event for plugins to react to, so routing through the + // pipeline would only widen the mock surface for no observability win. + if (state.assistantRowAwaitingFinalization && state.lastAssistantMessageId) { + try { + deleteMessageById(state.lastAssistantMessageId); + } catch (err) { + // Non-fatal: a leaked empty row is preferable to a turn-level throw. + deps.rlog.warn( + { err, messageId: state.lastAssistantMessageId }, + "Failed to clean up stranded reserved assistant row before new reservation", + ); + } + } + const metadata = buildAssistantChannelMetadata(state, deps); const reserveResult = (await runPipeline( "persistence", @@ -481,6 +530,7 @@ export async function handleLlmCallStarted( DEFAULT_TIMEOUTS.persistence, )) as PersistReserveResult; state.lastAssistantMessageId = reserveResult.message.id; + state.assistantRowAwaitingFinalization = true; deps.onEvent({ type: "assistant_turn_start", messageId: reserveResult.message.id, @@ -1155,6 +1205,7 @@ export async function handleMessageComplete( "handleMessageComplete fired without a prior llm_call_started reserving an assistant row", ); } + const contentJson = JSON.stringify(contentForPersistence); await runPipeline( "persistence", getMiddlewaresFor("persistence"), @@ -1162,11 +1213,94 @@ export async function handleMessageComplete( { op: "updateContent", messageId: assistantMessageId, - content: JSON.stringify(contentForPersistence), + content: contentJson, }, buildHandlerTurnContext(deps), DEFAULT_TIMEOUTS.persistence, ); + state.assistantRowAwaitingFinalization = false; + + // ── Indexing + attention projection (restored from the pre-B3 `add` path) ── + // `reserveMessage` + `updateMessageContent` are CRUD-only: they don't run + // the memory indexer or the attention-cursor projector. The pre-B3 path + // wrote the row via `addMessage`, which ran both as side-effects of the + // insert. Calling them here keeps the assistant row's external state + // (Qdrant segments, conversation attention cursor) in lockstep with the + // finalized content. Both are non-fatal — a memory hiccup must not + // escalate a successful generation into a turn-level throw. Indexing + // intentionally fires AFTER `updateContent` succeeds so we never index + // the empty reserved placeholder. + const finalizedRow = getMessageById( + assistantMessageId, + deps.ctx.conversationId, + ); + if (finalizedRow) { + let provenanceTrustClass: + | "guardian" + | "trusted_contact" + | "unknown" + | undefined; + let automated: boolean | undefined; + if (finalizedRow.metadata) { + try { + const parsedMeta = messageMetadataSchema.safeParse( + JSON.parse(finalizedRow.metadata), + ); + if (parsedMeta.success) { + provenanceTrustClass = parsedMeta.data.provenanceTrustClass; + automated = parsedMeta.data.automated; + } + } catch { + // Malformed metadata JSON — fall through with undefined fields, + // matching the legacy behavior in `addMessage`. + } + } + try { + await indexMessageNow( + { + messageId: assistantMessageId, + conversationId: deps.ctx.conversationId, + role: "assistant", + content: contentJson, + createdAt: finalizedRow.createdAt, + scopeId: "default", + provenanceTrustClass, + automated, + }, + getConfig().memory, + ); + } catch (err) { + deps.rlog.warn( + { + err, + conversationId: deps.ctx.conversationId, + messageId: assistantMessageId, + }, + "Failed to index assistant message for memory (non-fatal)", + ); + } + try { + const attentionStateChanged = projectAssistantMessage({ + conversationId: deps.ctx.conversationId, + messageId: assistantMessageId, + messageAt: finalizedRow.createdAt, + }); + if (attentionStateChanged) { + void publishSyncInvalidation([ + conversationMetadataSyncTag(deps.ctx.conversationId), + ]); + } + } catch (err) { + deps.rlog.warn( + { + err, + conversationId: deps.ctx.conversationId, + messageId: assistantMessageId, + }, + "Failed to project assistant message for attention tracking (non-fatal)", + ); + } + } // Backfill message_id on all LLM request logs from this turn. // The agent loop is single-threaded per conversation, so all rows with diff --git a/assistant/src/daemon/conversation-agent-loop.ts b/assistant/src/daemon/conversation-agent-loop.ts index 7217b7eb5f1..a600502708f 100644 --- a/assistant/src/daemon/conversation-agent-loop.ts +++ b/assistant/src/daemon/conversation-agent-loop.ts @@ -59,6 +59,7 @@ import { commitAppTurnChanges } from "../memory/app-git-service.js"; import { getApp, listAppFiles, resolveAppDir } from "../memory/app-store.js"; import { enqueueAutoAnalysisOnCompaction } from "../memory/auto-analysis-enqueue.js"; import { + deleteMessageById, getConversation, getConversationOriginChannel, getConversationOriginInterface, @@ -3170,6 +3171,34 @@ export async function runAgentLoopImpl( !abortController.signal.aborted && !yieldedForHandoff ) { + // Drop any reservation stranded by the failed LLM call before + // inserting the synthetic error message. The B3 pre-allocation + // path reserves an empty assistant row at `llm_call_started`; + // when the call exits through the provider-error branch (no + // `message_complete`), `assistantRowAwaitingFinalization` stays + // true. Without this delete the transcript would carry both the + // empty reserved row AND the error message — and downstream sync + // (`syncLastAssistantMessageToDisk`) would mis-target the empty + // row. After delete we set `lastAssistantMessageId` to the new + // error row's id so the post-loop emission paths still point at + // a real message. + if ( + state.assistantRowAwaitingFinalization && + state.lastAssistantMessageId + ) { + // Direct `deleteMessageById` (not via the `persistence` pipeline): + // see the same rationale on the matching cleanup in + // `handleLlmCallStarted` — an unfinalized reservation has no + // observable history for plugins. + try { + deleteMessageById(state.lastAssistantMessageId); + } catch (err) { + rlog.warn( + { err, messageId: state.lastAssistantMessageId }, + "Failed to clean up stranded reserved assistant row on provider-error path (non-fatal)", + ); + } + } const errChannelMeta = { ...provenanceFromTrustContext(ctx.trustContext), userMessageChannel: capturedTurnChannelContext.userMessageChannel, @@ -3197,6 +3226,15 @@ export async function runAgentLoopImpl( DEFAULT_TIMEOUTS.persistence, )) as PersistAddResult; persistedErrorAssistantMessage = true; + // Repoint `lastAssistantMessageId` at the synthetic error row so the + // post-loop sync, attachment resolution, and `message_complete`/ + // `generation_handoff` emissions all reference a real, persisted + // message id. The previous reservation (if any) was already deleted + // above. Mark finalization complete so the next LLM call in this run + // (or a downstream handler) doesn't try to clean up an id that + // already corresponds to a finalized row. + state.lastAssistantMessageId = errorPersistResult.message.id; + state.assistantRowAwaitingFinalization = false; newMessages.push(errorAssistantMessage); // Pipe the just-assigned message id into any orphaned LLM request log // row(s) for this turn. The success path links rows via From cb7733e0e33dbb9f5a497227391ba939f8f68c07 Mon Sep 17 00:00:00 2001 From: "vellum-apollo-bot[bot]" <242025090+vellum-apollo-bot[bot]@users.noreply.github.com> Date: Thu, 28 May 2026 19:47:06 +0000 Subject: [PATCH 3/3] test(agent-loop-exit-reason): include llm_call_started in max_tokens event sequence MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CI surfaced one failure on the merged branch: the `emits continuation surface event and exits on max_tokens` test asserted the full event sequence via `events.map((e) => e.type).toEqual([...])` and did not include `llm_call_started`, which B3 now emits as a real event right before `provider.sendMessage` so the pre-allocated assistant row id can be stamped onto downstream streaming events. Sequence updated to add `llm_call_started` at the head — the existing `countExitEvents` + `lastExitEvent` assertions still pass unchanged. 11/11 pass in agent-loop-exit-reason.test.ts. --- assistant/src/__tests__/agent-loop-exit-reason.test.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/assistant/src/__tests__/agent-loop-exit-reason.test.ts b/assistant/src/__tests__/agent-loop-exit-reason.test.ts index 7da1079ef22..87ac8d3e9bb 100644 --- a/assistant/src/__tests__/agent-loop-exit-reason.test.ts +++ b/assistant/src/__tests__/agent-loop-exit-reason.test.ts @@ -167,6 +167,7 @@ describe("AgentLoop exit-reason instrumentation", () => { }); expect(events.map((e) => e.type)).toEqual([ + "llm_call_started", "usage", "max_tokens_reached", "message_complete",