diff --git a/assistant/src/__tests__/agent-loop-exit-reason.test.ts b/assistant/src/__tests__/agent-loop-exit-reason.test.ts index 7da1079ef22..87ac8d3e9bb 100644 --- a/assistant/src/__tests__/agent-loop-exit-reason.test.ts +++ b/assistant/src/__tests__/agent-loop-exit-reason.test.ts @@ -167,6 +167,7 @@ describe("AgentLoop exit-reason instrumentation", () => { }); expect(events.map((e) => e.type)).toEqual([ + "llm_call_started", "usage", "max_tokens_reached", "message_complete", diff --git a/assistant/src/__tests__/conversation-abort-tool-results.test.ts b/assistant/src/__tests__/conversation-abort-tool-results.test.ts index 642f955bfa4..21199f9b45d 100644 --- a/assistant/src/__tests__/conversation-abort-tool-results.test.ts +++ b/assistant/src/__tests__/conversation-abort-tool-results.test.ts @@ -139,6 +139,7 @@ mock.module("../memory/conversation-crud.js", () => ({ getMessageById: () => null, getLastUserTimestampBefore: () => 0, reserveMessage: mock(async () => ({ id: "msg-reserve" })), + updateMessageContent: mock(() => {}), })); mock.module("../memory/conversation-queries.js", () => ({ @@ -193,6 +194,9 @@ mock.module("../agent/loop.js", () => ({ onEvent: (event: AgentEvent) => void, _signal?: AbortSignal, ): Promise { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); const history = [...messages]; // Simulate provider response with 2 tool_use blocks diff --git a/assistant/src/__tests__/conversation-agent-loop-overflow.test.ts b/assistant/src/__tests__/conversation-agent-loop-overflow.test.ts index cba8d6d93b6..03464a59cf2 100644 --- a/assistant/src/__tests__/conversation-agent-loop-overflow.test.ts +++ b/assistant/src/__tests__/conversation-agent-loop-overflow.test.ts @@ -736,6 +736,9 @@ describe("session-agent-loop overflow recovery (JARVIS-110)", () => { let agentLoopCallCount = 0; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); agentLoopCallCount++; if (agentLoopCallCount === 1) { // Simulate: agent makes progress (tool calls + results added) @@ -915,6 +918,9 @@ describe("session-agent-loop overflow recovery (JARVIS-110)", () => { }; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); callCount++; if (callCount === 1) { // Provider rejects with "prompt is too long: 242201 tokens > 200000" @@ -1037,6 +1043,9 @@ describe("session-agent-loop overflow recovery (JARVIS-110)", () => { }; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); callCount++; if (callCount === 1) { // Provider rejects: actual tokens 242201, way above estimate of 185k @@ -1162,6 +1171,9 @@ describe("session-agent-loop overflow recovery (JARVIS-110)", () => { }; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); callCount++; onEvent({ type: "message_complete", @@ -1250,6 +1262,9 @@ describe("session-agent-loop overflow recovery (JARVIS-110)", () => { let agentLoopCallCount = 0; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); agentLoopCallCount++; if (agentLoopCallCount === 1) { // Agent makes progress (tool calls succeed, messages grow) @@ -1441,6 +1456,9 @@ describe("session-agent-loop overflow recovery (JARVIS-110)", () => { _requestId, onCheckpoint, ) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); agentLoopCallCount++; if (agentLoopCallCount === 1) { @@ -1626,6 +1644,9 @@ describe("session-agent-loop overflow recovery (JARVIS-110)", () => { _requestId, onCheckpoint, ) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); agentLoopCallCount++; if (agentLoopCallCount === 1) { @@ -1801,6 +1822,9 @@ describe("session-agent-loop overflow recovery (JARVIS-110)", () => { _requestId, onCheckpoint, ) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); agentLoopCallCount++; // Every call: simulate tool progress then yield at checkpoint @@ -1962,6 +1986,9 @@ describe("session-agent-loop overflow recovery (JARVIS-110)", () => { _requestId, onCheckpoint, ) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); agentLoopCallCount++; const withProgress: Message[] = [ @@ -2194,6 +2221,9 @@ describe("session-agent-loop overflow recovery (JARVIS-110)", () => { }; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -2297,6 +2327,9 @@ describe("session-agent-loop overflow recovery (JARVIS-110)", () => { _requestId, onCheckpoint, ) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); agentLoopCallCount++; const withProgress: Message[] = [ diff --git a/assistant/src/__tests__/conversation-agent-loop.test.ts b/assistant/src/__tests__/conversation-agent-loop.test.ts index 6afebd292f8..b5cff4feae6 100644 --- a/assistant/src/__tests__/conversation-agent-loop.test.ts +++ b/assistant/src/__tests__/conversation-agent-loop.test.ts @@ -176,6 +176,12 @@ let mockConversationRow: Record = { title: null, }; let mockMessageById: Record | null = null; +const deleteMessageByIdMock = mock(() => ({ + segmentIds: [], + deletedSummaryIds: [], +})); +const reserveMessageMock = mock(async () => ({ id: "msg-reserve" })); +const updateMessageContentMock = mock(() => {}); mock.module("../memory/conversation-crud.js", () => ({ setConversationOriginChannelIfUnset: () => {}, updateConversationUsage: () => {}, @@ -189,7 +195,7 @@ mock.module("../memory/conversation-crud.js", () => ({ }), getConversationOriginInterface: () => null, addMessage: () => ({ id: "mock-msg-id" }), - deleteMessageById: () => {}, + deleteMessageById: deleteMessageByIdMock, updateConversationContextWindow: () => {}, updateConversationSlackContextWatermark: updateConversationSlackContextWatermarkMock, @@ -197,7 +203,36 @@ mock.module("../memory/conversation-crud.js", () => ({ getConversationOriginChannel: () => null, getMessageById: () => mockMessageById, getLastUserTimestampBefore: () => 0, - reserveMessage: mock(async () => ({ id: "msg-reserve" })), + reserveMessage: reserveMessageMock, + updateMessageContent: updateMessageContentMock, + // The real schema is a Zod object; tests don't exercise validation, + // so a passthrough is sufficient — the production code at + // `handleMessageComplete` only branches on `success` and reads two + // fields off `data`. `safeParse` of an empty object satisfies the + // schema (every field is optional). + messageMetadataSchema: { + safeParse: (input: unknown) => ({ success: true, data: input ?? {} }), + }, +})); + +// The B3 indexing-restoration path imports `indexMessageNow` from +// `../memory/indexer.js` and `projectAssistantMessage` from +// `../memory/conversation-attention-store.js`; without these stubs the +// real modules would try to open a SQLite DB and read a real config. +const indexMessageNowMock = mock(async () => ({ + indexedSegments: 0, + enqueuedJobs: 0, +})); +const projectAssistantMessageMock = mock(() => false); +const publishSyncInvalidationMock = mock(async () => {}); +mock.module("../memory/indexer.js", () => ({ + indexMessageNow: indexMessageNowMock, +})); +mock.module("../memory/conversation-attention-store.js", () => ({ + projectAssistantMessage: projectAssistantMessageMock, +})); +mock.module("../runtime/sync/sync-publisher.js", () => ({ + publishSyncInvalidation: publishSyncInvalidationMock, })); afterAll(() => { @@ -693,6 +728,13 @@ beforeEach(() => { mockSlackChronologicalContext = null; loadSlackChronologicalContextMock.mockClear(); getSlackCompactionWatermarkForPrefixMock.mockClear(); + deleteMessageByIdMock.mockClear(); + reserveMessageMock.mockClear(); + updateMessageContentMock.mockClear(); + indexMessageNowMock.mockClear(); + projectAssistantMessageMock.mockClear(); + publishSyncInvalidationMock.mockClear(); + mockMessageById = null; // Orchestrator pipelines (overflowReduce, persistence, …) run through the // plugin registry; reset and re-register every default so the pipelines // dispatch to middleware backed by the mocked collaborators these tests @@ -784,6 +826,9 @@ describe("session-agent-loop", () => { _requestId, onCheckpoint, ) => { + // Prime the assistant row anchor for LLM call 1 — production code + // emits this from `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); await onEvent({ type: "message_complete", message: { @@ -809,6 +854,9 @@ describe("session-agent-loop", () => { hasToolUse: true, history: messages, }); + // Prime the anchor again for LLM call 2 — multi-call agent turns + // reserve a fresh assistant row per LLM call. + await onEvent({ type: "llm_call_started" }); await onEvent({ type: "message_complete", message: { @@ -1065,6 +1113,9 @@ describe("session-agent-loop", () => { const events: ServerMessage[] = []; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); // Simulate tool_use + error during execution onEvent({ type: "tool_use", @@ -1114,6 +1165,9 @@ describe("session-agent-loop", () => { const events: ServerMessage[] = []; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -1174,6 +1228,9 @@ describe("session-agent-loop", () => { }; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -1239,6 +1296,9 @@ describe("session-agent-loop", () => { }; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -1321,6 +1381,9 @@ describe("session-agent-loop", () => { }; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -1389,6 +1452,9 @@ describe("session-agent-loop", () => { }> = []; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "text_delta", text: "Hi." }); onEvent({ type: "message_complete", @@ -1464,6 +1530,9 @@ describe("session-agent-loop", () => { }> = []; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); // No text_delta — pure tool-call response onEvent({ type: "message_complete", @@ -1527,6 +1596,9 @@ describe("session-agent-loop", () => { const events: ServerMessage[] = []; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -1639,6 +1711,9 @@ describe("session-agent-loop", () => { }); const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -1729,6 +1804,11 @@ describe("session-agent-loop", () => { }; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. Retry branches + // need this on every invocation: each agent-loop iteration reserves + // its own row. + await onEvent({ type: "llm_call_started" }); callCount++; if (callCount === 1) { onEvent({ @@ -1856,6 +1936,11 @@ describe("session-agent-loop", () => { }; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. Retry branches + // need this on every invocation: each agent-loop iteration reserves + // its own row. + await onEvent({ type: "llm_call_started" }); callCount++; if (callCount === 1) { onEvent({ @@ -1941,6 +2026,11 @@ describe("session-agent-loop", () => { mockOverflowAction = "auto_compress_latest_turn"; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. Retry branches + // need this on every invocation: each agent-loop iteration reserves + // its own row. + await onEvent({ type: "llm_call_started" }); callCount++; if (callCount <= 2) { onEvent({ @@ -2238,6 +2328,9 @@ describe("session-agent-loop", () => { const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { agentLoopCalls++; + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -2286,6 +2379,11 @@ describe("session-agent-loop", () => { let callCount = 0; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. Retry branches + // need this on every invocation: each agent-loop iteration reserves + // its own row. + await onEvent({ type: "llm_call_started" }); callCount++; if (callCount === 1) { onEvent({ @@ -2370,6 +2468,11 @@ describe("session-agent-loop", () => { _reqId, onCheckpoint, ) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. Retry branches + // need this on every invocation: each agent-loop iteration reserves + // its own row. + await onEvent({ type: "llm_call_started" }); // Simulate tool use followed by checkpoint onEvent({ type: "tool_use", id: "tu-1", name: "file_read", input: {} }); onEvent({ @@ -2443,6 +2546,11 @@ describe("session-agent-loop", () => { _reqId, onCheckpoint, ) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. Retry branches + // need this on every invocation: each agent-loop iteration reserves + // its own row. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "tool_use", id: "tu-1", name: "file_read", input: {} }); onEvent({ type: "tool_result", @@ -2505,6 +2613,9 @@ describe("session-agent-loop", () => { const abortController = new AbortController(); const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -2565,6 +2676,9 @@ describe("session-agent-loop", () => { resolveAssistantAttachmentsMock.mockClear(); const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -2604,6 +2718,9 @@ describe("session-agent-loop", () => { test("increments turnCount after successful run", async () => { const ctx = makeCtx({ agentLoopRun: async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -2637,6 +2754,9 @@ describe("session-agent-loop", () => { test("clears processing state and abort controller", async () => { const ctx = makeCtx({ agentLoopRun: async (messages, onEvent) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -2700,8 +2820,13 @@ describe("session-agent-loop", () => { const ctx = makeCtx({ agentLoopRun: async ( messages: Message[], - onEvent: (event: AgentEvent) => void, + onEvent: (event: AgentEvent) => void | Promise, ) => { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. Must be + // awaited so the assistant row is reserved before message_complete + // tries to write into it. + await onEvent({ type: "llm_call_started" }); onEvent({ type: "message_complete", message: { @@ -3045,6 +3170,280 @@ describe("session-agent-loop", () => { }); }); + describe("B3 pre-allocation: indexing + cleanup", () => { + test("handleMessageComplete indexes and projects the finalized assistant row", async () => { + // The pre-B3 path inserted assistant rows via `addMessage`, which ran + // the memory indexer and the conversation-attention projector as + // side-effects of the insert. B3 splits the write into + // `reserveMessage` + `updateMessageContent`, both of which are CRUD-only, + // so the indexing + projection calls had to be re-driven explicitly + // after `updateContent` succeeds. Codex P1 caught a regression where + // this path was missing entirely; this test pins it down. + mockMessageById = { + id: "msg-reserve", + conversationId: "test-conv", + createdAt: 1234567, + role: "assistant", + content: "[]", + metadata: null, + }; + // Force attention projection to report a state change so we also + // observe the sync-invalidation publish path on the same turn. + projectAssistantMessageMock.mockImplementationOnce(() => true); + + const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + await onEvent({ type: "llm_call_started" }); + // `message_complete` is awaited so `handleMessageComplete` (and its + // async indexer + projector chain) completes before the next event + // or before the loop returns. Without the await the projector's + // synchronous call still races against the test's assertion phase + // because the indexer's `await` yields microtasks. + await onEvent({ + type: "message_complete", + message: { + role: "assistant", + content: [{ type: "text", text: "indexed reply" }], + }, + }); + onEvent({ + type: "usage", + inputTokens: 10, + outputTokens: 5, + model: "test", + providerDurationMs: 50, + }); + return [ + ...messages, + { + role: "assistant" as const, + content: [ + { type: "text", text: "indexed reply" }, + ] as ContentBlock[], + }, + ]; + }; + + const ctx = makeCtx({ agentLoopRun }); + await runAgentLoopImpl(ctx, "hi", "msg-1", () => {}); + + // Indexer fired with the reserved row's id + the finalized content. + expect(indexMessageNowMock).toHaveBeenCalledTimes(1); + const indexCallArgs = indexMessageNowMock.mock.calls[0] as unknown as [ + { + messageId: string; + conversationId: string; + role: string; + content: string; + createdAt: number; + scopeId: string; + }, + unknown, + ]; + const indexCall = indexCallArgs[0]; + expect(indexCall).toMatchObject({ + messageId: "msg-reserve", + conversationId: "test-conv", + role: "assistant", + createdAt: 1234567, + scopeId: "default", + }); + expect(indexCall.content).toContain("indexed reply"); + + // Attention projector fired with the same row coordinates. + expect(projectAssistantMessageMock).toHaveBeenCalledTimes(1); + const projectCall = projectAssistantMessageMock.mock + .calls[0] as unknown as [ + { conversationId: string; messageId: string; messageAt: number }, + ]; + expect(projectCall[0]).toEqual({ + conversationId: "test-conv", + messageId: "msg-reserve", + messageAt: 1234567, + }); + + // Projection reported a state change → sync invalidation fires with + // the conversation `:metadata` tag. The mock also receives a + // `:messages` invalidation from the orchestrator's + // `publishLoopMessagesChanged` post-loop emit, so we filter by tag + // rather than asserting a total call count. + const metadataPublishes = ( + publishSyncInvalidationMock.mock.calls as unknown as Array<[string[]]> + ).filter((args) => args[0]?.includes("conversation:test-conv:metadata")); + expect(metadataPublishes).toHaveLength(1); + }); + + test("handleMessageComplete skips sync invalidation when attention state unchanged", async () => { + // Mirror of the previous test but with the default projector return + // (`false`). The projection still runs every turn, but the sync + // invalidation publish must be gated on attention-state movement to + // avoid flooding clients with no-op metadata refreshes. + mockMessageById = { + id: "msg-reserve", + conversationId: "test-conv", + createdAt: 999, + role: "assistant", + content: "[]", + metadata: null, + }; + + const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + await onEvent({ type: "llm_call_started" }); + // See sibling test — `message_complete` must be awaited so the + // projector call lands before the assertion phase. + await onEvent({ + type: "message_complete", + message: { + role: "assistant", + content: [{ type: "text", text: "quiet" }], + }, + }); + onEvent({ + type: "usage", + inputTokens: 1, + outputTokens: 1, + model: "test", + providerDurationMs: 1, + }); + return [ + ...messages, + { + role: "assistant" as const, + content: [{ type: "text", text: "quiet" }] as ContentBlock[], + }, + ]; + }; + + const ctx = makeCtx({ agentLoopRun }); + await runAgentLoopImpl(ctx, "hi", "msg-1", () => {}); + + expect(projectAssistantMessageMock).toHaveBeenCalledTimes(1); + // The mock will still receive a `:messages` invalidation from the + // orchestrator's `publishLoopMessagesChanged` — filter to the + // `:metadata` tag and assert it never landed. + const metadataPublishes = ( + publishSyncInvalidationMock.mock.calls as unknown as Array<[string[]]> + ).filter((args) => args[0]?.includes("conversation:test-conv:metadata")); + expect(metadataPublishes).toHaveLength(0); + }); + + test("handleLlmCallStarted deletes a stranded reservation before reserving a new row", async () => { + // Simulates a retry path: the first LLM call reserves an assistant row + // but exits without `message_complete` (e.g. context-overflow rescue, + // ordering-error rescue, image-overflow rescue). The next + // `llm_call_started` must delete the stranded row so the transcript + // does not accumulate empty assistant bubbles. + reserveMessageMock + .mockImplementationOnce(async () => ({ id: "msg-strand-A" })) + .mockImplementationOnce(async () => ({ id: "msg-strand-B" })); + // Indexer/projector mocks default to no-op; no finalized row in this + // test, so `mockMessageById` stays null. + + const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // First LLM call: reserve msg-strand-A, never finalize. + await onEvent({ type: "llm_call_started" }); + // Second LLM call: should delete msg-strand-A before reserving + // msg-strand-B. + await onEvent({ type: "llm_call_started" }); + // Finalize the second one so the loop has a valid assistant message + // and exits cleanly. + onEvent({ + type: "message_complete", + message: { + role: "assistant", + content: [{ type: "text", text: "retry succeeded" }], + }, + }); + onEvent({ + type: "usage", + inputTokens: 5, + outputTokens: 3, + model: "test", + providerDurationMs: 25, + }); + return [ + ...messages, + { + role: "assistant" as const, + content: [ + { type: "text", text: "retry succeeded" }, + ] as ContentBlock[], + }, + ]; + }; + + const ctx = makeCtx({ agentLoopRun }); + await runAgentLoopImpl(ctx, "hi", "msg-1", () => {}); + + // Exactly one delete fires — for msg-strand-A, before the second + // reserve. The second reservation is committed via `updateContent` + // (not deleted), and after the run completes + // `assistantRowAwaitingFinalization` is false, so no further delete + // is attempted on shutdown. + expect(deleteMessageByIdMock).toHaveBeenCalledTimes(1); + const strandDeleteCall = deleteMessageByIdMock.mock + .calls[0] as unknown as [string]; + expect(strandDeleteCall[0]).toBe("msg-strand-A"); + expect(reserveMessageMock).toHaveBeenCalledTimes(2); + }); + + test("provider-error branch deletes the orphaned reservation and repoints lastAssistantMessageId", async () => { + // Codex P2 regression: B3 reserves an empty assistant row at + // `llm_call_started`. When the call exits via the provider-error + // branch (no `message_complete`), the synthetic error message is + // inserted separately. Without cleanup the transcript would carry + // both the empty reserved row AND the error message, and + // `syncLastAssistantMessageToDisk` (which reads + // `state.lastAssistantMessageId`) would mis-target the deleted + // reservation id. + reserveMessageMock.mockImplementationOnce(async () => ({ + id: "msg-orphaned-reservation", + })); + + const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { + // Reserve the orphan. + await onEvent({ type: "llm_call_started" }); + // Provider rejects — writes the llm_request_log row and arms + // `state.providerErrorUserMessage` via `handleError`. + onEvent({ + type: "provider_error", + error: new Error("upstream 500"), + rawRequest: { model: "gpt-4.1", messages: [] }, + actualProvider: "openai", + }); + onEvent({ + type: "error", + error: new Error("upstream 500"), + }); + // No assistant message in the result — the synthetic-error branch + // below the agent loop fires. + return messages; + }; + + const ctx = makeCtx({ agentLoopRun }); + await runAgentLoopImpl(ctx, "hi", "msg-1", () => {}); + + // The orphan was deleted exactly once, before the synthetic error + // message landed. + expect(deleteMessageByIdMock).toHaveBeenCalledTimes(1); + const deleteCall = deleteMessageByIdMock.mock.calls[0] as unknown as [ + string, + ]; + expect(deleteCall[0]).toBe("msg-orphaned-reservation"); + + // Post-loop `syncLastAssistantMessageToDisk` targets the synthetic + // error row's id (`mock-msg-id` from the mocked `addMessage`), NOT + // the deleted reservation id. This is the externally-observable + // proof that `state.lastAssistantMessageId` was repointed. + expect(syncMessageToDiskMock).toHaveBeenCalled(); + const syncCalls = syncMessageToDiskMock.mock.calls as unknown as Array< + [string, string, number] + >; + const lastSync = syncCalls[syncCalls.length - 1]; + expect(lastSync?.[1]).toBe("mock-msg-id"); + expect(lastSync?.[1]).not.toBe("msg-orphaned-reservation"); + }); + }); + describe("pkbSystemReminderBlock metadata persistence", () => { test("persists pkbSystemReminderBlock in full mode with PKB active", async () => { const reminder = "\npkb content\n"; @@ -3909,6 +4308,11 @@ describe("session-agent-loop", () => { let callCount = 0; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { callCount++; + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. Retry branches + // need this on every invocation: each agent-loop iteration reserves + // its own row. + await onEvent({ type: "llm_call_started" }); if (callCount === 1) { // Trigger convergence path: error + appended assistant message so // updatedHistory.length > preRunHistoryLength at the strip site. @@ -3989,6 +4393,11 @@ describe("session-agent-loop", () => { let callCount = 0; const agentLoopRun: AgentLoopRun = async (messages, onEvent) => { callCount++; + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. Retry branches + // need this on every invocation: each agent-loop iteration reserves + // its own row. + await onEvent({ type: "llm_call_started" }); if (callCount === 1) { onEvent({ type: "error", diff --git a/assistant/src/__tests__/conversation-provider-retry-repair.test.ts b/assistant/src/__tests__/conversation-provider-retry-repair.test.ts index 286d0de9b0c..240a17987f8 100644 --- a/assistant/src/__tests__/conversation-provider-retry-repair.test.ts +++ b/assistant/src/__tests__/conversation-provider-retry-repair.test.ts @@ -186,6 +186,7 @@ mock.module("../memory/conversation-crud.js", () => ({ getMessageById: () => null, getLastUserTimestampBefore: () => 0, reserveMessage: mock(async () => ({ id: "msg-reserve" })), + updateMessageContent: mock(() => {}), })); mock.module("../memory/conversation-queries.js", () => ({ @@ -278,6 +279,9 @@ mock.module("../agent/loop.js", () => ({ onEvent: (event: AgentEvent) => void, _signal?: AbortSignal, ): Promise { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); agentLoopRunCount++; if ( diff --git a/assistant/src/__tests__/conversation-queue.test.ts b/assistant/src/__tests__/conversation-queue.test.ts index f62da7d1082..98cd2ef43c9 100644 --- a/assistant/src/__tests__/conversation-queue.test.ts +++ b/assistant/src/__tests__/conversation-queue.test.ts @@ -194,6 +194,7 @@ mock.module("../memory/conversation-crud.js", () => ({ getMessageById: () => null, getLastUserTimestampBefore: () => 0, reserveMessage: mock(async () => ({ id: "msg-reserve" })), + updateMessageContent: mock(() => {}), })); mock.module("../memory/conversation-queries.js", () => ({ @@ -316,7 +317,7 @@ interface PendingRun { resolve: (history: Message[]) => void; reject: (err: Error) => void; messages: Message[]; - onEvent: (event: AgentEvent) => void; + onEvent: (event: AgentEvent) => void | Promise; onCheckpoint?: ( checkpoint: CheckpointInfo, ) => CheckpointDecision | Promise; @@ -338,7 +339,7 @@ mock.module("../agent/loop.js", () => ({ } async run( messages: Message[], - onEvent: (event: AgentEvent) => void, + onEvent: (event: AgentEvent) => void | Promise, _signal?: AbortSignal, _requestId?: string, onCheckpoint?: ( @@ -472,7 +473,7 @@ async function waitForCondition( * that `runAgentLoop` expects (usage + message_complete) so the conversation * cleanly transitions out of its processing state. */ -function resolveRun(index: number) { +async function resolveRun(index: number) { const run = pendingRuns[index]; if (!run) throw new Error(`No pending run at index ${index}`); // Emit the events runAgentLoop expects @@ -480,14 +481,17 @@ function resolveRun(index: number) { role: "assistant", content: [{ type: "text", text: `reply-${index}` }], }; - run.onEvent({ + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await run.onEvent({ type: "llm_call_started" }); + await run.onEvent({ type: "usage", inputTokens: 10, outputTokens: 5, model: "mock", providerDurationMs: 100, }); - run.onEvent({ type: "message_complete", message: assistantMsg }); + await run.onEvent({ type: "message_complete", message: assistantMsg }); // Return updated history with the assistant message appended run.resolve([...run.messages, assistantMsg]); } @@ -545,7 +549,7 @@ describe("Conversation message queue", () => { expect(conversation.getQueueDepth()).toBe(1); // Complete the first message - resolveRun(0); + await resolveRun(0); await p1; // After the first run resolves, the queue drains and triggers a second run. @@ -558,7 +562,7 @@ describe("Conversation message queue", () => { expect(pendingRuns.length).toBe(2); // Complete the second run - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); }); @@ -585,7 +589,7 @@ describe("Conversation message queue", () => { expect(conversation.getQueueDepth()).toBe(2); // Complete run 0 → drain pulls msg-2 and msg-3 into ONE batched run. - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -624,7 +628,7 @@ describe("Conversation message queue", () => { expect(combinedUserText).toContain("msg-3"); // Resolve the batched run; message_complete must fan out to both clients. - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); expect(events2.some((e) => e.type === "message_complete")).toBe(true); @@ -651,7 +655,7 @@ describe("Conversation message queue", () => { expect(result.queued).toBe(true); // Complete first - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -665,7 +669,7 @@ describe("Conversation message queue", () => { }); // Complete second run so the conversation finishes cleanly - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); }); @@ -771,7 +775,7 @@ describe("Conversation message queue", () => { // Complete first → drain pulls all three same-interface passthroughs // into a single batched run (depth → 0, runs → 2 total). - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -779,7 +783,7 @@ describe("Conversation message queue", () => { expect(pendingRuns.length).toBe(2); // Complete the batched run; conversation finishes cleanly. - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); }); @@ -808,7 +812,7 @@ describe("Conversation message queue", () => { // Complete first message — triggers drain. The empty message should fail // to persist, but the drain should continue to msg-3. - resolveRun(0); + await resolveRun(0); await p1; // msg-3 should have been dequeued and started a new AgentLoop.run @@ -825,7 +829,7 @@ describe("Conversation message queue", () => { expect(events3.some((e) => e.type === "message_dequeued")).toBe(true); // Complete the third message's run - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); // msg-3 should have completed successfully @@ -900,7 +904,7 @@ describe("Batched drain", () => { expect(conversation.getQueueDepth()).toBe(4); // Resolve msg-1 → batched run pulls macos msg-2 + msg-3. - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -928,7 +932,7 @@ describe("Batched drain", () => { ); // Resolve the batched run → drain pulls the cli single-message run. - resolveRun(1); + await resolveRun(1); await waitForPendingRun(3); // cli run contains msg-4 as a single-message run. @@ -943,7 +947,7 @@ describe("Batched drain", () => { ); // Resolve the cli run → drain pulls the final macos single-message run. - resolveRun(2); + await resolveRun(2); await waitForPendingRun(4); const finalHistory = pendingRuns[3].messages; const finalUserText = finalHistory @@ -958,7 +962,7 @@ describe("Batched drain", () => { // Four total runs: msg-1, batched [msg-2, msg-3], msg-4, msg-5. expect(pendingRuns.length).toBe(4); - resolveRun(3); + await resolveRun(3); await new Promise((r) => setTimeout(r, 10)); }); @@ -1000,7 +1004,7 @@ describe("Batched drain", () => { // Resolve msg-1 → drain pulls "hello" as its own run (batch stops at // /compact boundary). - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -1011,7 +1015,7 @@ describe("Batched drain", () => { // Resolve "hello" → drain pops /compact via the builder-rejected path, // runs its short-circuit (no new runAgentLoop), then drains "world". - resolveRun(1); + await resolveRun(1); await waitForPendingRun(3); // /compact should have emitted its own message_complete via the short- @@ -1020,7 +1024,7 @@ describe("Batched drain", () => { expect(eventsWorld.some((e) => e.type === "message_dequeued")).toBe(true); expect(pendingRuns.length).toBe(3); - resolveRun(2); + await resolveRun(2); await new Promise((r) => setTimeout(r, 10)); }); @@ -1074,7 +1078,7 @@ describe("Batched drain", () => { // Resolve msg-1 → drain pulls "plain-a" as its own run (batch stops at // the /status boundary). - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -1087,7 +1091,7 @@ describe("Batched drain", () => { // runs its unknown-slash short-circuit (no new runAgentLoop, emits // assistant_text_delta + message_complete inline), then drains "plain-b" // as its own run. - resolveRun(1); + await resolveRun(1); await waitForPendingRun(3); // /status should have emitted its own assistant_text_delta + message_complete @@ -1101,7 +1105,7 @@ describe("Batched drain", () => { // without a runAgentLoop invocation. expect(pendingRuns.length).toBe(3); - resolveRun(2); + await resolveRun(2); await new Promise((r) => setTimeout(r, 10)); }); @@ -1137,7 +1141,7 @@ describe("Batched drain", () => { conversation.enqueueMessage("with-B", attachB, () => {}, "req-B"); expect(conversation.getQueueDepth()).toBe(2); - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -1171,7 +1175,7 @@ describe("Batched drain", () => { expect(allText).toContain("a.png"); expect(allText).toContain("b.png"); - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); }); @@ -1232,13 +1236,13 @@ describe("Batched drain", () => { } // Complete in-flight → drain pulls both queued passthroughs as ONE batched run. - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); expect(conversation.getQueueDepth()).toBe(0); // Resolve the batched run. - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); // After the full drain, the byte budget must be fully reclaimed — a fresh @@ -1255,10 +1259,10 @@ describe("Batched drain", () => { .queued, ).toBe(true); - resolveRun(2); + await resolveRun(2); await p2; await waitForPendingRun(4); - resolveRun(3); + await resolveRun(3); await new Promise((r) => setTimeout(r, 10)); }); }); @@ -1308,7 +1312,7 @@ describe("Batched drain correctness fixes", () => { // Complete run 0 → drain must NOT batch the surface-action with the // regular passthrough. Expect the surface-action to drain as a single // run first. - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -1323,7 +1327,7 @@ describe("Batched drain correctness fixes", () => { // Complete the surface-action run; drain pulls the regular passthrough // as its own separate run. - resolveRun(1); + await resolveRun(1); await waitForPendingRun(3); expect(pendingRuns.length).toBe(3); expect( @@ -1332,7 +1336,7 @@ describe("Batched drain correctness fixes", () => { // Total runs = 3: msg-1, surface-action, regular — NOT 2 (would mean // they were batched). - resolveRun(2); + await resolveRun(2); await new Promise((r) => setTimeout(r, 10)); }); @@ -1383,7 +1387,7 @@ describe("Batched drain correctness fixes", () => { ).length; // Complete run 0 → drain pulls the sibling batch. - resolveRun(0); + await resolveRun(0); await p1; // Give the drain loop a chance to iterate. Abort happens on msg-3's @@ -1448,7 +1452,7 @@ describe("Batched drain correctness fixes", () => { expect(conversation.getQueueDepth()).toBe(3); // Complete run 0 → batched drain. - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -1465,7 +1469,7 @@ describe("Batched drain correctness fixes", () => { ).toBe("req-tail"); // Cleanup: resolve the batched run. - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 20)); }); @@ -1512,12 +1516,12 @@ describe("Batched drain correctness fixes", () => { "req-fanout-tail", ); - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); // Drive the batched run to emit message_complete via fanOutOnEvent. - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 20)); expect(events3.find((e) => e.type === "error")).toBeDefined(); @@ -1550,7 +1554,7 @@ describe("Batched drain correctness fixes", () => { conversation.enqueueMessage("msg-4", [], () => {}, "req-4"); // Complete run 0 → drain pulls the batched siblings as ONE run. - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -1570,7 +1574,7 @@ describe("Batched drain correctness fixes", () => { requestId: "req-2", // head's requestId, per the fix }); - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); }); @@ -1619,9 +1623,9 @@ describe("Conversation queue policy helpers", () => { expect(conversation.hasQueuedMessages()).toBe(true); // Cleanup: resolve the pending run - resolveRun(0); + await resolveRun(0); await waitForPendingRun(2); - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); }); @@ -1646,7 +1650,7 @@ describe("Conversation queue policy helpers", () => { expect(conversation.canHandoffAtCheckpoint()).toBe(false); // Cleanup - resolveRun(0); + await resolveRun(0); await new Promise((r) => setTimeout(r, 10)); }); @@ -1666,9 +1670,9 @@ describe("Conversation queue policy helpers", () => { expect(conversation.canHandoffAtCheckpoint()).toBe(true); // Cleanup - resolveRun(0); + await resolveRun(0); await waitForPendingRun(2); - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); }); @@ -1733,7 +1737,7 @@ describe("Conversation checkpoint handoff", () => { expect(decision).toBe("yield"); // Complete the run so the conversation finishes cleanly - resolveRun(0); + await resolveRun(0); await p1; // After yield, the first message should emit generation_handoff @@ -1748,7 +1752,7 @@ describe("Conversation checkpoint handoff", () => { // The queued message should now be draining (second run started) await waitForPendingRun(2); - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); }); @@ -1776,7 +1780,7 @@ describe("Conversation checkpoint handoff", () => { expect(decision).toBe("continue"); // Cleanup - resolveRun(0); + await resolveRun(0); await p1; }); @@ -1816,7 +1820,7 @@ describe("Conversation checkpoint handoff", () => { expect(decision).toBe("yield"); // Complete first run - resolveRun(0); + await resolveRun(0); await p1; // The yielded drain pulls ALL THREE queued siblings as ONE batched run — @@ -1830,7 +1834,7 @@ describe("Conversation checkpoint handoff", () => { expect(events4.some((e) => e.type === "message_dequeued")).toBe(true); // Resolve the batched run — message_complete fans out to all three clients. - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); expect(events2.some((e) => e.type === "message_complete")).toBe(true); @@ -1874,7 +1878,7 @@ describe("Conversation checkpoint handoff", () => { expect(decision).toBe("yield"); // Complete the run (AgentLoop resolves after yielding) - resolveRun(0); + await resolveRun(0); await p1; // Verify generation_handoff was emitted (not plain message_complete) @@ -1897,7 +1901,7 @@ describe("Conversation checkpoint handoff", () => { expect(events2.some((e) => e.type === "message_dequeued")).toBe(true); // Complete the second run - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); }); @@ -1967,7 +1971,7 @@ describe("Conversation checkpoint handoff", () => { history: [], }), ).toBe("yield"); - resolveRun(0); + await resolveRun(0); await pA; // B should be draining @@ -1984,7 +1988,7 @@ describe("Conversation checkpoint handoff", () => { history: [], }), ).toBe("yield"); - resolveRun(1); + await resolveRun(1); await waitForPendingRun(3); // Handoff from C -> D @@ -1999,7 +2003,7 @@ describe("Conversation checkpoint handoff", () => { history: [], }), ).toBe("yield"); - resolveRun(2); + await resolveRun(2); await waitForPendingRun(4); // D has no more queued -> checkpoint should return 'continue' @@ -2014,7 +2018,7 @@ describe("Conversation checkpoint handoff", () => { }), ).toBe("continue"); - resolveRun(3); + await resolveRun(3); await new Promise((r) => setTimeout(r, 10)); // Verify FIFO dequeue order @@ -2044,7 +2048,7 @@ describe("Conversation checkpoint handoff", () => { expect(conversation.getQueueDepth()).toBe(2); // Complete message A — triggers drain. B should fail, C should proceed. - resolveRun(0); + await resolveRun(0); await pA; // C should have been dequeued and started a new AgentLoop.run @@ -2061,7 +2065,7 @@ describe("Conversation checkpoint handoff", () => { expect(eventsC.some((e) => e.type === "message_dequeued")).toBe(true); // Complete C's run - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); // C should have completed successfully @@ -2098,7 +2102,7 @@ describe("Conversation checkpoint handoff", () => { expect(pendingRuns[1].onCheckpoint).toBeDefined(); // Complete retry cleanly - resolveRun(1); + await resolveRun(1); await p1; }); }); @@ -2121,7 +2125,7 @@ describe("Conversation usage requestId correlation", () => { await waitForPendingRun(1); // Complete the run — this triggers recordUsage with the request's ID - resolveRun(0); + await resolveRun(0); await p1; // The usage event should carry the request ID, not null @@ -2159,7 +2163,7 @@ describe("Terminal trace events on rejection/failure", () => { conversation.enqueueMessage("msg-3", [], () => {}, "req-3"); // Complete first — triggers drain, empty msg fails persist - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -2174,7 +2178,7 @@ describe("Terminal trace events on rejection/failure", () => { expect(errorTrace).toBeDefined(); // Cleanup - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); }); }); @@ -2216,6 +2220,7 @@ describe("Conversation host attachment directives", () => { }, ], }; + await run.onEvent({ type: "llm_call_started" }); run.onEvent({ type: "usage", inputTokens: 10, @@ -2285,6 +2290,7 @@ describe("Conversation host attachment directives", () => { }, ], }; + await run.onEvent({ type: "llm_call_started" }); run.onEvent({ type: "usage", inputTokens: 10, @@ -2378,6 +2384,7 @@ describe("Conversation attachment event payloads", () => { } as any, ], }); + await run.onEvent({ type: "llm_call_started" }); run.onEvent({ type: "usage", inputTokens: 10, @@ -2453,6 +2460,7 @@ describe("Conversation attachment event payloads", () => { } as any, ], }); + await run.onEvent({ type: "llm_call_started" }); run.onEvent({ type: "usage", inputTokens: 10, @@ -2479,7 +2487,7 @@ describe("Conversation attachment event payloads", () => { expect(attachments[0].data).toBe("iVBORw0K"); await waitForPendingRun(2); - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 10)); }); }); @@ -2511,7 +2519,7 @@ describe("Regression: cancel semantics and error channel split", () => { conversation.abort(); // Resolve the pending run so the abort-check path fires - resolveRun(0); + await resolveRun(0); await p1; // generation_cancelled should be emitted via the per-message callback @@ -2557,6 +2565,7 @@ describe("Regression: cancel semantics and error channel split", () => { } as any, ], }); + await run.onEvent({ type: "llm_call_started" }); run.onEvent({ type: "usage", inputTokens: 10, @@ -2681,7 +2690,7 @@ describe("Regression: cancel semantics and error channel split", () => { conversation.enqueueMessage("msg-2", [], (e) => events2.push(e), "req-2"); // Complete the first agent loop run - resolveRun(0); + await resolveRun(0); // The turn should still complete (timeout fires) and drain the queue // even though commitTurnChanges never resolves. @@ -2702,7 +2711,7 @@ describe("Regression: cancel semantics and error channel split", () => { // Complete the second run so the test can clean up turnCommitHangForever = false; - resolveRun(1); + await resolveRun(1); await new Promise((r) => origSetTimeout(r, 10)); } finally { turnCommitHangForever = false; diff --git a/assistant/src/__tests__/conversation-slash-queue.test.ts b/assistant/src/__tests__/conversation-slash-queue.test.ts index ae319f04609..b5f7d78c586 100644 --- a/assistant/src/__tests__/conversation-slash-queue.test.ts +++ b/assistant/src/__tests__/conversation-slash-queue.test.ts @@ -136,6 +136,7 @@ mock.module("../memory/conversation-crud.js", () => ({ getMessageById: () => null, getLastUserTimestampBefore: () => 0, reserveMessage: mock(async () => ({ id: "msg-reserve" })), + updateMessageContent: mock(() => {}), })); mock.module("../memory/conversation-queries.js", () => ({ @@ -205,7 +206,7 @@ mock.module("../config/skill-state.js", () => ({ interface PendingRun { resolve: (history: Message[]) => void; messages: Message[]; - onEvent: (event: AgentEvent) => void; + onEvent: (event: AgentEvent) => void | Promise; } let pendingRuns: PendingRun[] = []; @@ -224,7 +225,7 @@ mock.module("../agent/loop.js", () => ({ } async run( messages: Message[], - onEvent: (event: AgentEvent) => void, + onEvent: (event: AgentEvent) => void | Promise, _signal?: AbortSignal, _requestId?: string, _onCheckpoint?: ( @@ -334,21 +335,24 @@ async function waitForPendingRun( } } -function resolveRun(index: number) { +async function resolveRun(index: number) { const run = pendingRuns[index]; if (!run) throw new Error(`No pending run at index ${index}`); const assistantMsg: Message = { role: "assistant", content: [{ type: "text", text: `reply-${index}` }], }; - run.onEvent({ + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await run.onEvent({ type: "llm_call_started" }); + await run.onEvent({ type: "usage", inputTokens: 10, outputTokens: 5, model: "mock", providerDurationMs: 100, }); - run.onEvent({ type: "message_complete", message: assistantMsg }); + await run.onEvent({ type: "message_complete", message: assistantMsg }); run.resolve([...run.messages, assistantMsg]); } @@ -409,7 +413,7 @@ describe("Conversation queue — slash-like messages pass through to agent loop" expect(conversation.getQueueDepth()).toBe(2); // Complete first run — drain pulls both queued messages into one batched run. - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -420,7 +424,7 @@ describe("Conversation queue — slash-like messages pass through to agent loop" // Exactly 2 runs total: msg-1 + batched [/not-a-skill, msg-3]. expect(pendingRuns.length).toBe(2); - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 50)); }); @@ -449,7 +453,7 @@ describe("Conversation queue — slash-like messages pass through to agent loop" ); // Complete first run — triggers drain - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -464,7 +468,7 @@ describe("Conversation queue — slash-like messages pass through to agent loop" // Content passes through as-is — no rewriting expect(text).toContain("/start-the-day"); - resolveRun(1); + await resolveRun(1); await new Promise((r) => setTimeout(r, 50)); }); @@ -495,7 +499,7 @@ describe("Conversation queue — slash-like messages pass through to agent loop" expect(conversation.getQueueDepth()).toBe(3); // Resolve msg-1 → drain pulls only "hi" (batch builder stops at /compact). - resolveRun(0); + await resolveRun(0); await p1; await waitForPendingRun(2); @@ -504,14 +508,14 @@ describe("Conversation queue — slash-like messages pass through to agent loop" // Resolve "hi" → /compact short-circuits without a new runAgentLoop, then // drains "bye" as its own run. - resolveRun(1); + await resolveRun(1); await waitForPendingRun(3); expect(eventsCompact.some((e) => e.type === "message_complete")).toBe(true); expect(eventsBye.some((e) => e.type === "message_dequeued")).toBe(true); expect(pendingRuns.length).toBe(3); - resolveRun(2); + await resolveRun(2); await new Promise((r) => setTimeout(r, 50)); }); diff --git a/assistant/src/__tests__/conversation-slash-unknown.test.ts b/assistant/src/__tests__/conversation-slash-unknown.test.ts index 098f6174810..aa4f2049be5 100644 --- a/assistant/src/__tests__/conversation-slash-unknown.test.ts +++ b/assistant/src/__tests__/conversation-slash-unknown.test.ts @@ -137,6 +137,7 @@ mock.module("../memory/conversation-crud.js", () => ({ getMessageById: () => null, getLastUserTimestampBefore: () => 0, reserveMessage: mock(async () => ({ id: "msg-reserve" })), + updateMessageContent: mock(() => {}), })); mock.module("../memory/conversation-queries.js", () => ({ @@ -255,6 +256,9 @@ mock.module("../agent/loop.js", () => ({ checkpoint: CheckpointInfo, ) => CheckpointDecision | Promise, ): Promise { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); agentLoopRunCalled = true; const assistantMsg: Message = { role: "assistant", diff --git a/assistant/src/__tests__/conversation-workspace-injection.test.ts b/assistant/src/__tests__/conversation-workspace-injection.test.ts index c1ccac68ad2..8adadae24c0 100644 --- a/assistant/src/__tests__/conversation-workspace-injection.test.ts +++ b/assistant/src/__tests__/conversation-workspace-injection.test.ts @@ -146,6 +146,7 @@ mock.module("../memory/conversation-crud.js", () => ({ getConversationOverrideProfileFromRow: () => undefined, updateMessageMetadata: () => {}, reserveMessage: mock(async () => ({ id: "msg-reserve" })), + updateMessageContent: mock(() => {}), })); mock.module("../memory/conversation-queries.js", () => ({ @@ -245,6 +246,9 @@ mock.module("../agent/loop.js", () => ({ messages: Message[], onEvent: (event: AgentEvent) => void, ): Promise { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); runCalls.push(messages); agentLoopScript(onEvent); onEvent({ diff --git a/assistant/src/__tests__/conversation-workspace-tool-tracking.test.ts b/assistant/src/__tests__/conversation-workspace-tool-tracking.test.ts index 17b01ce7f4c..7a7dafeb875 100644 --- a/assistant/src/__tests__/conversation-workspace-tool-tracking.test.ts +++ b/assistant/src/__tests__/conversation-workspace-tool-tracking.test.ts @@ -143,6 +143,7 @@ mock.module("../memory/conversation-crud.js", () => ({ getConversationOverrideProfileFromRow: () => undefined, updateMessageMetadata: () => {}, reserveMessage: mock(async () => ({ id: "msg-reserve" })), + updateMessageContent: mock(() => {}), })); mock.module("../memory/conversation-queries.js", () => ({ @@ -229,6 +230,9 @@ mock.module("../agent/loop.js", () => ({ messages: Message[], onEvent: (event: AgentEvent) => void, ): Promise { + // Prime the assistant row anchor — production code emits this from + // `AgentLoop.run` just before `provider.sendMessage`. + await onEvent({ type: "llm_call_started" }); agentLoopScript(onEvent); onEvent({ type: "usage", diff --git a/assistant/src/__tests__/outbound-slack-persistence.test.ts b/assistant/src/__tests__/outbound-slack-persistence.test.ts index 89c25f0cdc5..20072328b74 100644 --- a/assistant/src/__tests__/outbound-slack-persistence.test.ts +++ b/assistant/src/__tests__/outbound-slack-persistence.test.ts @@ -107,11 +107,47 @@ mock.module("../memory/conversation-crud.js", () => ({ : {}; row.metadata = JSON.stringify({ ...existing, ...updates }); }, - updateMessageContent: () => {}, + updateMessageContent: (messageId: string, content: string) => { + // Mirror updateContent into the same capture array so existing + // `lastAssistantPersisted()` assertions continue to find the row that + // was reserved at `llm_call_started` time. + const row = persistedRows.find((candidate) => candidate.id === messageId); + if (row) row.content = content; + const call = addMessageCalls.find((c) => c.id === messageId); + if (call) call.content = content; + }, // The handler treats provenance as a flat spread; returning {} keeps the // metadata snapshot focused on the fields under test. provenanceFromTrustContext: () => ({}), - reserveMessage: mock(async () => ({ id: "msg-reserve" })), + reserveMessage: mock( + async ( + conversationId: string, + role: string, + metadata?: Record, + ) => { + // B3: production code creates the assistant row at `llm_call_started` + // via `reserveMessage`, stamping channel metadata at reserve time. + // Mirror that into the addMessage capture array so existing + // `lastAssistantPersisted()` assertions keep working. + const id = `mock-msg-${addMessageCalls.length + 1}-reserve`; + addMessageCalls.push({ + id, + conversationId, + role, + content: "", + metadata, + }); + persistedRows.push({ + id, + conversationId, + role, + content: "", + createdAt: Date.now(), + metadata: metadata ? JSON.stringify(metadata) : null, + }); + return { id }; + }, + ), })); mock.module("../memory/llm-request-log-store.js", () => ({ @@ -148,6 +184,7 @@ import type { } from "../daemon/conversation-agent-loop-handlers.js"; import { createEventHandlerState, + handleLlmCallStarted, handleMessageComplete, } from "../daemon/conversation-agent-loop-handlers.js"; import type { ServerMessage } from "../daemon/message-protocol.js"; @@ -256,6 +293,7 @@ describe("outbound assistant Slack metadata persistence", () => { assistantMessageChannel: "slack", requesterChatId: channelId, }); + await handleLlmCallStarted(state, deps); await handleMessageComplete(state, deps, makeMessageCompleteEvent("hi")); const persisted = lastAssistantPersisted(); @@ -297,6 +335,7 @@ describe("outbound assistant Slack metadata persistence", () => { requesterTimezoneLabel: "ET", clientTimezone: "America/Los_Angeles", }); + await handleLlmCallStarted(state, deps); await handleMessageComplete( state, deps, @@ -325,6 +364,7 @@ describe("outbound assistant Slack metadata persistence", () => { requesterChatId: channelId, clientTimezone: "America/Los_Angeles", }); + await handleLlmCallStarted(state, deps); await handleMessageComplete( state, deps, @@ -352,6 +392,7 @@ describe("outbound assistant Slack metadata persistence", () => { requesterChatId: channelId, requesterTimezoneLabel: "ET", }); + await handleLlmCallStarted(state, deps); await handleMessageComplete( state, deps, @@ -395,6 +436,7 @@ describe("outbound assistant Slack metadata persistence", () => { assistantMessageChannel: "slack", requesterChatId: channelId, }); + await handleLlmCallStarted(state, deps); await handleMessageComplete(state, deps, makeMessageCompleteEvent("hello")); const persisted = lastAssistantPersisted(); @@ -426,6 +468,7 @@ describe("outbound assistant Slack metadata persistence", () => { assistantMessageChannel: "slack", requesterChatId: channelId, }); + await handleLlmCallStarted(state, deps); await handleMessageComplete( state, deps, @@ -447,6 +490,7 @@ describe("outbound assistant Slack metadata persistence", () => { const deps = makeDeps(conversationId, { assistantMessageChannel: "vellum", }); + await handleLlmCallStarted(state, deps); await handleMessageComplete( state, deps, diff --git a/assistant/src/__tests__/persistence-secret-redaction.test.ts b/assistant/src/__tests__/persistence-secret-redaction.test.ts index ce926835754..798dfba6021 100644 --- a/assistant/src/__tests__/persistence-secret-redaction.test.ts +++ b/assistant/src/__tests__/persistence-secret-redaction.test.ts @@ -40,6 +40,7 @@ mock.module("../config/loader.js", () => ({ })); interface AddMessageCall { + id: string; conversationId: string; role: string; content: string; @@ -53,14 +54,41 @@ mock.module("../memory/conversation-crud.js", () => ({ content: string, metadata?: Record, ) => { - addMessageCalls.push({ conversationId, role, content, metadata }); - return { id: `mock-msg-${addMessageCalls.length}` }; + const id = `mock-msg-${addMessageCalls.length + 1}`; + addMessageCalls.push({ id, conversationId, role, content, metadata }); + return { id }; }, getConversation: () => null, getMessageById: () => null, - updateMessageContent: () => {}, + updateMessageContent: (messageId: string, content: string) => { + // Mirror updateContent into the same capture array so existing + // `lastPersisted("assistant")` assertions continue to find the row that + // was reserved at `llm_call_started` time. + const call = addMessageCalls.find((c) => c.id === messageId); + if (call) call.content = content; + }, provenanceFromTrustContext: () => ({}), - reserveMessage: mock(async () => ({ id: "msg-reserve" })), + reserveMessage: mock( + async ( + conversationId: string, + role: string, + metadata?: Record, + ) => { + // B3: production code creates the assistant row at `llm_call_started` + // via `reserveMessage`, stamping channel metadata at reserve time. + // Mirror that into the addMessage capture array so existing + // `lastPersisted("assistant")` assertions keep working. + const id = `mock-msg-${addMessageCalls.length + 1}-reserve`; + addMessageCalls.push({ + id, + conversationId, + role, + content: "", + metadata, + }); + return { id }; + }, + ), })); mock.module("../memory/llm-request-log-store.js", () => ({ @@ -85,6 +113,7 @@ import type { } from "../daemon/conversation-agent-loop-handlers.js"; import { createEventHandlerState, + handleLlmCallStarted, handleMessageComplete, } from "../daemon/conversation-agent-loop-handlers.js"; @@ -165,7 +194,12 @@ describe("persistence-layer secret redaction", () => { isError: false, }); - await handleMessageComplete(state, makeDeps(), makeMessageCompleteEvent("done")); + await handleLlmCallStarted(state, makeDeps()); + await handleMessageComplete( + state, + makeDeps(), + makeMessageCompleteEvent("done"), + ); const persisted = lastPersisted("user"); const blocks = JSON.parse(persisted.content) as Array<{ @@ -184,7 +218,12 @@ describe("persistence-layer secret redaction", () => { isError: false, }); - await handleMessageComplete(state, makeDeps(), makeMessageCompleteEvent("done")); + await handleLlmCallStarted(state, makeDeps()); + await handleMessageComplete( + state, + makeDeps(), + makeMessageCompleteEvent("done"), + ); const persisted = lastPersisted("user"); const blocks = JSON.parse(persisted.content) as Array<{ @@ -196,13 +235,19 @@ describe("persistence-layer secret redaction", () => { }); test("does not redact non-secret content (UUID, hex hash) in tool result", async () => { - const safe = "id=550e8400-e29b-41d4-a716-446655440000 sha=a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2"; + const safe = + "id=550e8400-e29b-41d4-a716-446655440000 sha=a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2"; state.pendingToolResults.set("tool-use-3", { content: safe, isError: false, }); - await handleMessageComplete(state, makeDeps(), makeMessageCompleteEvent("done")); + await handleLlmCallStarted(state, makeDeps()); + await handleMessageComplete( + state, + makeDeps(), + makeMessageCompleteEvent("done"), + ); const persisted = lastPersisted("user"); const blocks = JSON.parse(persisted.content) as Array<{ @@ -225,7 +270,12 @@ describe("persistence-layer secret redaction", () => { // Capture the content before handleMessageComplete clears pendingToolResults const contentSnapshot = state.pendingToolResults.get("tool-use-4")!.content; - await handleMessageComplete(state, makeDeps(), makeMessageCompleteEvent("done")); + await handleLlmCallStarted(state, makeDeps()); + await handleMessageComplete( + state, + makeDeps(), + makeMessageCompleteEvent("done"), + ); // The snapshot taken from live state before the call must be unmodified expect(contentSnapshot).toBe(originalContent); @@ -240,7 +290,12 @@ describe("persistence-layer secret redaction", () => { "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; const text = `Your API key is \`${secret}\`. Keep it safe.`; - await handleMessageComplete(state, makeDeps(), makeMessageCompleteEvent(text)); + await handleLlmCallStarted(state, makeDeps()); + await handleMessageComplete( + state, + makeDeps(), + makeMessageCompleteEvent(text), + ); const persisted = lastPersisted("assistant"); const blocks = JSON.parse(persisted.content) as Array<{ @@ -257,7 +312,12 @@ describe("persistence-layer secret redaction", () => { const secret = "sk-proj-ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrst"; const text = `I found this key in the config: ${secret}`; - await handleMessageComplete(state, makeDeps(), makeMessageCompleteEvent(text)); + await handleLlmCallStarted(state, makeDeps()); + await handleMessageComplete( + state, + makeDeps(), + makeMessageCompleteEvent(text), + ); const persisted = lastPersisted("assistant"); const blocks = JSON.parse(persisted.content) as Array<{ @@ -272,7 +332,12 @@ describe("persistence-layer secret redaction", () => { test("does not redact non-secret text in assistant message", async () => { const safe = "Here is the file list: index.ts, util.ts, main.ts"; - await handleMessageComplete(state, makeDeps(), makeMessageCompleteEvent(safe)); + await handleLlmCallStarted(state, makeDeps()); + await handleMessageComplete( + state, + makeDeps(), + makeMessageCompleteEvent(safe), + ); const persisted = lastPersisted("assistant"); const blocks = JSON.parse(persisted.content) as Array<{ @@ -287,7 +352,12 @@ describe("persistence-layer secret redaction", () => { // High-entropy but no known credential prefix — should NOT be redacted const text = "checksum: 8f14e45fceea167a5a36dedd4bea2543"; - await handleMessageComplete(state, makeDeps(), makeMessageCompleteEvent(text)); + await handleLlmCallStarted(state, makeDeps()); + await handleMessageComplete( + state, + makeDeps(), + makeMessageCompleteEvent(text), + ); const persisted = lastPersisted("assistant"); const blocks = JSON.parse(persisted.content) as Array<{ diff --git a/assistant/src/agent/loop.ts b/assistant/src/agent/loop.ts index 490cc31b61d..8f97f9cf5e6 100644 --- a/assistant/src/agent/loop.ts +++ b/assistant/src/agent/loop.ts @@ -121,6 +121,15 @@ export type AgentLoopExitReason = | "error"; export type AgentEvent = + /** + * Emitted once per LLM call inside the loop, immediately before the + * `provider.sendMessage` invocation. Carries the optional `callSite` tag so + * downstream handlers (the daemon's persistence pipeline) can decide + * whether to reserve a row for this call. One `llm_call_started` precedes + * every `message_complete` for the same call; multi-call agent turns emit + * one pair per call. + */ + | { type: "llm_call_started"; callSite?: LLMCallSite } | { type: "text_delta"; text: string } | { type: "thinking_delta"; thinking: string } | { type: "message_complete"; message: Message } @@ -766,6 +775,18 @@ export class AgentLoop { toolUseTurns, ); + // Announce the LLM-call boundary so downstream handlers (the + // daemon's persistence pipeline) can reserve an empty assistant row + // and stamp the resulting `messageId` onto every streaming event the + // call emits. Emit as late as possible — after history stripping, + // arg construction, and turn-context resolution — so the gap + // between "we said the call started" and the actual provider HTTP + // call is minimized. Awaited so the row is created and the + // `assistant_turn_start` wire event reaches the client BEFORE the + // provider starts streaming deltas — the deltas downstream will + // carry the freshly-reserved id. + await onEvent({ type: "llm_call_started", callSite }); + // Inner try/catch narrows error-recording scope to the provider // call itself. The outer agent-loop catch (below) wraps the entire // turn body (tool execution, plugin pipelines, checkpoints), so diff --git a/assistant/src/daemon/conversation-agent-loop-handlers.ts b/assistant/src/daemon/conversation-agent-loop-handlers.ts index 9fcaf217d9c..c4003906fc7 100644 --- a/assistant/src/daemon/conversation-agent-loop-handlers.ts +++ b/assistant/src/daemon/conversation-agent-loop-handlers.ts @@ -17,12 +17,16 @@ import type { import { getConfig } from "../config/loader.js"; import { recordEstimate } from "../context/estimator-calibration.js"; import { getCalibrationProviderKey } from "../context/token-estimator.js"; +import { projectAssistantMessage } from "../memory/conversation-attention-store.js"; import { + deleteMessageById, getConversation, getMessageById, + messageMetadataSchema, provenanceFromTrustContext, updateMessageContent, } from "../memory/conversation-crud.js"; +import { indexMessageNow } from "../memory/indexer.js"; import { backfillMessageIdOnLogs, buildProviderErrorResponsePayload, @@ -41,13 +45,14 @@ import { defaultPersistenceTerminal } from "../plugins/defaults/persistence.js"; import { DEFAULT_TIMEOUTS, runPipeline } from "../plugins/pipeline.js"; import { getMiddlewaresFor } from "../plugins/registry.js"; import type { - PersistAddResult, PersistArgs, + PersistReserveResult, PersistResult, TurnContext, } from "../plugins/types.js"; import type { ContentBlock, ImageContent } from "../providers/types.js"; import { isContextOverflowError } from "../providers/types.js"; +import { publishSyncInvalidation } from "../runtime/sync/sync-publisher.js"; import { redactSecrets } from "../security/secret-scanner.js"; import { extractDomain } from "../tools/network/domain-normalize.js"; import { @@ -77,6 +82,7 @@ import type { SurfaceAction, UiSurfaceShow, } from "./message-protocol.js"; +import { conversationMetadataSyncTag } from "./message-types/sync.js"; import type { WebSearchMetadata, WebSearchResultItem, @@ -156,6 +162,21 @@ export interface EventHandlerState { contextTooLargeError: unknown; providerErrorUserMessage: string | null; lastAssistantMessageId: string | undefined; + /** + * True when `handleLlmCallStarted` has reserved an empty assistant row + * that has NOT yet been finalized via `handleMessageComplete` + * (`op:"updateContent"` + indexing + projection). Used by error/retry + * paths to detect a stranded reservation that must be cleaned up + * before the next LLM call reserves a fresh row — without it, every + * retryable failure (overflow, ordering, image overflow) and every + * terminal provider rejection would leak an empty assistant bubble + * into the transcript and mispoint downstream sync/projection. + * + * Cleared by `handleMessageComplete` on successful finalize, and by + * the synthetic-error branch in `conversation-agent-loop.ts` after it + * absorbs the reserved row into the error message. + */ + assistantRowAwaitingFinalization: boolean; readonly pendingToolResults: Map; readonly persistedToolUseIds: Set; readonly accumulatedDirectives: DirectiveRequest[]; @@ -256,6 +277,7 @@ export function createEventHandlerState(): EventHandlerState { contextTooLargeError: null, providerErrorUserMessage: null, lastAssistantMessageId: undefined, + assistantRowAwaitingFinalization: false, pendingToolResults: new Map(), persistedToolUseIds: new Set(), accumulatedDirectives: [], @@ -403,6 +425,133 @@ function resolveAssistantReplyTimestampTimezone( }).effectiveTimezone; } +/** + * Assemble the metadata envelope written to the assistant message row. + * + * Stamped at reserve time (before `provider.sendMessage`) so the row carries + * channel provenance from the moment it lands in SQLite, mirroring the + * snapshot that handleMessageComplete used to compute at end-of-turn. All + * inputs (channel context, trust context, turnStartedAt) are stable across + * the LLM call, so building this once at reserve is equivalent to building + * it at complete. Slack reply rows further stamp a `slackMeta` sub-object — + * the `channelTs` field stays absent here and is back-filled by + * `deliverReplyViaCallback` after the gateway returns the ts. + */ +function buildAssistantChannelMetadata( + state: EventHandlerState, + deps: EventHandlerDeps, +): Record { + const metadata: Record = { + ...provenanceFromTrustContext(deps.ctx.trustContext), + userMessageChannel: deps.turnChannelContext.userMessageChannel, + assistantMessageChannel: deps.turnChannelContext.assistantMessageChannel, + userMessageInterface: deps.turnInterfaceContext.userMessageInterface, + assistantMessageInterface: + deps.turnInterfaceContext.assistantMessageInterface, + sentAt: state.turnStartedAt, + }; + + if (deps.turnChannelContext.assistantMessageChannel === "slack") { + const channelId = deps.ctx.trustContext?.requesterChatId; + if (channelId) { + const threadTs = getThreadTs(deps.ctx.conversationId); + const timestampTimezone = resolveAssistantReplyTimestampTimezone( + deps.ctx, + ); + const timestampTimezoneLabel = formatSlackTimezoneLabel( + timestampTimezone, + { nowMs: state.turnStartedAt }, + ); + const partialSlackMeta: Partial = { + source: "slack", + eventKind: "message", + channelId, + ...(threadTs ? { threadTs } : {}), + timestampTimezone, + ...(timestampTimezoneLabel ? { timestampTimezoneLabel } : {}), + }; + // `channelTs` is filled in by the post-send reconciliation step in + // `deliverReplyViaCallback`; cast through the Partial to satisfy + // the writer's type at this pre-send boundary. + metadata.slackMeta = writeSlackMetadata( + partialSlackMeta as SlackMessageMetadata, + ); + } + } + + return metadata; +} + +/** + * Reserve an empty assistant row for the LLM call about to begin, stash + * its id on `state.lastAssistantMessageId`, and announce the boundary on + * the wire via `assistant_turn_start`. + * + * Awaited so the row exists and the client has the anchor id BEFORE any + * streaming delta arrives — every subsequent `deps.onEvent` in this LLM + * call stamps `messageId: state.lastAssistantMessageId`, and + * `handleMessageComplete` flushes the final content to the same row via + * `op: "updateContent"` instead of inserting a fresh one. + * + * Multi-LLM-call agent turns (LLM call → tool execution → LLM call) emit + * one `llm_call_started` per call, so each LLM call reserves its own row. + * The read-path `findDisplayTurnEndIndex` collapses consecutive assistant + * rows for the merged history view, matching today's per-call DB layout. + */ +export async function handleLlmCallStarted( + state: EventHandlerState, + deps: EventHandlerDeps, +): Promise { + // Clean up an orphaned reservation from a previous LLM call in this run + // that errored before `message_complete` could finalize it. This covers + // the retryable paths (overflow, ordering, image overflow) where the + // agent loop re-enters with a fresh `run()` and reserves another row; + // without this delete the failed-attempt row stays in the transcript as + // an empty assistant bubble. The finalized-row case is filtered out via + // the `assistantRowAwaitingFinalization` flag — `handleMessageComplete` + // clears it after the successful `updateContent`, so the previous call's + // committed row is never touched here. + // + // Direct `deleteMessageById` (not via the `persistence` pipeline) is + // intentional: a never-finalized reservation has no segments, no + // attachments, and no observable history — undoing it isn't a real + // persistence event for plugins to react to, so routing through the + // pipeline would only widen the mock surface for no observability win. + if (state.assistantRowAwaitingFinalization && state.lastAssistantMessageId) { + try { + deleteMessageById(state.lastAssistantMessageId); + } catch (err) { + // Non-fatal: a leaked empty row is preferable to a turn-level throw. + deps.rlog.warn( + { err, messageId: state.lastAssistantMessageId }, + "Failed to clean up stranded reserved assistant row before new reservation", + ); + } + } + + const metadata = buildAssistantChannelMetadata(state, deps); + const reserveResult = (await runPipeline( + "persistence", + getMiddlewaresFor("persistence"), + defaultPersistenceTerminal, + { + op: "reserve", + conversationId: deps.ctx.conversationId, + role: "assistant", + metadata, + }, + buildHandlerTurnContext(deps), + DEFAULT_TIMEOUTS.persistence, + )) as PersistReserveResult; + state.lastAssistantMessageId = reserveResult.message.id; + state.assistantRowAwaitingFinalization = true; + deps.onEvent({ + type: "assistant_turn_start", + messageId: reserveResult.message.id, + conversationId: deps.ctx.conversationId, + }); +} + // ── Individual Handlers ────────────────────────────────────────────── function handleTextDelta( @@ -431,6 +580,7 @@ function handleTextDelta( type: "assistant_text_delta", text: drained.emitText, conversationId: deps.ctx.conversationId, + messageId: state.lastAssistantMessageId, }); if (deps.shouldGenerateTitle) state.firstAssistantText += drained.emitText; } @@ -468,6 +618,7 @@ function handleThinkingDelta( type: "assistant_thinking_delta", thinking: event.thinking, conversationId: deps.ctx.conversationId, + messageId: state.lastAssistantMessageId, }); } @@ -498,11 +649,12 @@ export function handleToolUse( input: event.input, conversationId: deps.ctx.conversationId, toolUseId: event.id, + messageId: state.lastAssistantMessageId, }); } export function handleToolUsePreviewStart( - _state: EventHandlerState, + state: EventHandlerState, deps: EventHandlerDeps, event: Extract, ): void { @@ -511,6 +663,7 @@ export function handleToolUsePreviewStart( toolUseId: event.toolUseId, toolName: event.toolName, conversationId: deps.ctx.conversationId, + messageId: state.lastAssistantMessageId, }); const statusText = `Preparing ${friendlyToolName(event.toolName)}...`; deps.ctx.emitActivityState( @@ -523,7 +676,7 @@ export function handleToolUsePreviewStart( } function handleToolOutputChunk( - _state: EventHandlerState, + state: EventHandlerState, deps: EventHandlerDeps, event: Extract, ): void { @@ -581,6 +734,7 @@ function handleToolOutputChunk( chunk: event.chunk, conversationId: deps.ctx.conversationId, toolUseId: event.toolUseId, + messageId: state.lastAssistantMessageId, subType: structured.subType, subToolName: structured.subToolName, subToolInput: structured.subToolInput, @@ -593,12 +747,13 @@ function handleToolOutputChunk( chunk: event.chunk, conversationId: deps.ctx.conversationId, toolUseId: event.toolUseId, + messageId: state.lastAssistantMessageId, }); } } export function handleInputJsonDelta( - _state: EventHandlerState, + state: EventHandlerState, deps: EventHandlerDeps, event: Extract, ): void { @@ -612,6 +767,7 @@ export function handleInputJsonDelta( content: event.accumulatedJson, conversationId: deps.ctx.conversationId, toolUseId: event.toolUseId, + messageId: state.lastAssistantMessageId, }); } @@ -738,6 +894,7 @@ export function handleToolResult( diff: event.diff, status: event.status, conversationId: deps.ctx.conversationId, + messageId: state.lastAssistantMessageId, imageData: imageDataList?.[0], imageDataList, toolUseId: event.toolUseId, @@ -994,6 +1151,7 @@ export async function handleMessageComplete( type: "assistant_text_delta", text: state.pendingDirectiveDisplayBuffer, conversationId: deps.ctx.conversationId, + messageId: state.lastAssistantMessageId, }); if (deps.shouldGenerateTitle) state.firstAssistantText += state.pendingDirectiveDisplayBuffer; @@ -1100,52 +1258,6 @@ export async function handleMessageComplete( } as unknown as ContentBlock); } - const assistantChannelMetadata: Record = { - ...provenanceFromTrustContext(deps.ctx.trustContext), - userMessageChannel: deps.turnChannelContext.userMessageChannel, - assistantMessageChannel: deps.turnChannelContext.assistantMessageChannel, - userMessageInterface: deps.turnInterfaceContext.userMessageInterface, - assistantMessageInterface: - deps.turnInterfaceContext.assistantMessageInterface, - sentAt: state.turnStartedAt, - }; - - // When the assistant is replying through Slack, stamp a `slackMeta` - // sub-object so the transcript-rendering / thread-aware-context lookup - // can identify this row's thread without joining tables. - // Persistence happens BEFORE the Slack adapter sends the message, so - // Slack's authoritative `ts` (-> `channelTs`) is not yet known and is - // intentionally omitted here. The post-send reconciliation step in - // `deliverReplyViaCallback` writes `channelTs` back into this row once - // the gateway returns the Slack-assigned ts, restoring a fully-formed - // metadata envelope before any subsequent turn reads the row. - if (deps.turnChannelContext.assistantMessageChannel === "slack") { - const channelId = deps.ctx.trustContext?.requesterChatId; - if (channelId) { - const threadTs = getThreadTs(deps.ctx.conversationId); - const timestampTimezone = resolveAssistantReplyTimestampTimezone( - deps.ctx, - ); - const timestampTimezoneLabel = formatSlackTimezoneLabel( - timestampTimezone, - { nowMs: state.turnStartedAt }, - ); - const partialSlackMeta: Partial = { - source: "slack", - eventKind: "message", - channelId, - ...(threadTs ? { threadTs } : {}), - timestampTimezone, - ...(timestampTimezoneLabel ? { timestampTimezoneLabel } : {}), - }; - assistantChannelMetadata.slackMeta = writeSlackMetadata( - // `channelTs` is filled in by the post-send reconciliation step in - // `deliverReplyViaCallback`; cast through the Partial to satisfy - // the writer's type at this pre-send boundary. - partialSlackMeta as SlackMessageMetadata, - ); - } - } // Redact known-pattern secrets from assistant text blocks before they are // written to durable storage. Non-text blocks (images, UI surfaces) pass // through unchanged. The live model history retains the original values. @@ -1157,32 +1269,123 @@ export async function handleMessageComplete( return block; }); - // Route the assistant-message persistence through the `persistence` - // pipeline. No `syncToDisk` here — the orchestrator separately invokes - // `syncMessageToDisk` on `state.lastAssistantMessageId` after the loop - // completes (see `conversation-agent-loop.ts::syncLastAssistantMessageToDisk`). - const assistantPersistResult = (await runPipeline( + // The row was reserved at `llm_call_started` (with channel metadata + // stamped at that point) and `state.lastAssistantMessageId` carries its + // id. Flush the final content via `updateContent` instead of inserting a + // new row. No `syncToDisk` flag here — the orchestrator separately + // invokes `syncMessageToDisk` on `state.lastAssistantMessageId` after + // the loop completes (see + // `conversation-agent-loop.ts::syncLastAssistantMessageToDisk`). + const assistantMessageId = state.lastAssistantMessageId; + if (!assistantMessageId) { + throw new Error( + "handleMessageComplete fired without a prior llm_call_started reserving an assistant row", + ); + } + const contentJson = JSON.stringify(contentForPersistence); + await runPipeline( "persistence", getMiddlewaresFor("persistence"), defaultPersistenceTerminal, { - op: "add", - conversationId: deps.ctx.conversationId, - role: "assistant", - content: JSON.stringify(contentForPersistence), - metadata: assistantChannelMetadata, + op: "updateContent", + messageId: assistantMessageId, + content: contentJson, }, buildHandlerTurnContext(deps), DEFAULT_TIMEOUTS.persistence, - )) as PersistAddResult; - const assistantMsg = assistantPersistResult.message; - state.lastAssistantMessageId = assistantMsg.id; + ); + state.assistantRowAwaitingFinalization = false; + + // ── Indexing + attention projection (restored from the pre-B3 `add` path) ── + // `reserveMessage` + `updateMessageContent` are CRUD-only: they don't run + // the memory indexer or the attention-cursor projector. The pre-B3 path + // wrote the row via `addMessage`, which ran both as side-effects of the + // insert. Calling them here keeps the assistant row's external state + // (Qdrant segments, conversation attention cursor) in lockstep with the + // finalized content. Both are non-fatal — a memory hiccup must not + // escalate a successful generation into a turn-level throw. Indexing + // intentionally fires AFTER `updateContent` succeeds so we never index + // the empty reserved placeholder. + const finalizedRow = getMessageById( + assistantMessageId, + deps.ctx.conversationId, + ); + if (finalizedRow) { + let provenanceTrustClass: + | "guardian" + | "trusted_contact" + | "unknown" + | undefined; + let automated: boolean | undefined; + if (finalizedRow.metadata) { + try { + const parsedMeta = messageMetadataSchema.safeParse( + JSON.parse(finalizedRow.metadata), + ); + if (parsedMeta.success) { + provenanceTrustClass = parsedMeta.data.provenanceTrustClass; + automated = parsedMeta.data.automated; + } + } catch { + // Malformed metadata JSON — fall through with undefined fields, + // matching the legacy behavior in `addMessage`. + } + } + try { + await indexMessageNow( + { + messageId: assistantMessageId, + conversationId: deps.ctx.conversationId, + role: "assistant", + content: contentJson, + createdAt: finalizedRow.createdAt, + scopeId: "default", + provenanceTrustClass, + automated, + }, + getConfig().memory, + ); + } catch (err) { + deps.rlog.warn( + { + err, + conversationId: deps.ctx.conversationId, + messageId: assistantMessageId, + }, + "Failed to index assistant message for memory (non-fatal)", + ); + } + try { + const attentionStateChanged = projectAssistantMessage({ + conversationId: deps.ctx.conversationId, + messageId: assistantMessageId, + messageAt: finalizedRow.createdAt, + }); + if (attentionStateChanged) { + void publishSyncInvalidation([ + conversationMetadataSyncTag(deps.ctx.conversationId), + ]); + } + } catch (err) { + deps.rlog.warn( + { + err, + conversationId: deps.ctx.conversationId, + messageId: assistantMessageId, + }, + "Failed to project assistant message for attention tracking (non-fatal)", + ); + } + } // Backfill message_id on all LLM request logs from this turn. // The agent loop is single-threaded per conversation, so all rows with - // message_id IS NULL belong to the current turn. + // message_id IS NULL belong to the current turn. The reserved id was + // available before the LLM call ran but the logs are inserted DURING + // the call, so the sweep still runs here. try { - backfillMessageIdOnLogs(deps.ctx.conversationId, assistantMsg.id); + backfillMessageIdOnLogs(deps.ctx.conversationId, assistantMessageId); } catch (err) { deps.rlog.warn( { err }, @@ -1191,7 +1394,10 @@ export async function handleMessageComplete( } try { - backfillMemoryRecallLogMessageId(deps.ctx.conversationId, assistantMsg.id); + backfillMemoryRecallLogMessageId( + deps.ctx.conversationId, + assistantMessageId, + ); } catch (err) { deps.rlog.warn( { err }, @@ -1202,7 +1408,7 @@ export async function handleMessageComplete( try { backfillMemoryV2ActivationMessageId( deps.ctx.conversationId, - assistantMsg.id, + assistantMessageId, ); } catch (err) { deps.rlog.warn( @@ -1402,6 +1608,9 @@ export async function dispatchAgentEvent( ): Promise { try { switch (event.type) { + case "llm_call_started": + await handleLlmCallStarted(state, deps); + break; case "text_delta": handleTextDelta(state, deps, event); break; @@ -1442,6 +1651,7 @@ export async function dispatchAgentEvent( input: event.input, conversationId: deps.ctx.conversationId, toolUseId: event.toolUseId, + messageId: state.lastAssistantMessageId, }); break; } @@ -1521,6 +1731,7 @@ export async function dispatchAgentEvent( isError: event.isError, conversationId: deps.ctx.conversationId, toolUseId: event.toolUseId, + messageId: state.lastAssistantMessageId, ...(metadata ? { activityMetadata: { webSearch: metadata } } : {}), }); break; diff --git a/assistant/src/daemon/conversation-agent-loop.ts b/assistant/src/daemon/conversation-agent-loop.ts index fcc44952aef..cacea017f42 100644 --- a/assistant/src/daemon/conversation-agent-loop.ts +++ b/assistant/src/daemon/conversation-agent-loop.ts @@ -59,6 +59,7 @@ import { commitAppTurnChanges } from "../memory/app-git-service.js"; import { getApp, listAppFiles, resolveAppDir } from "../memory/app-store.js"; import { enqueueAutoAnalysisOnCompaction } from "../memory/auto-analysis-enqueue.js"; import { + deleteMessageById, getConversation, getConversationOriginChannel, getConversationOriginInterface, @@ -3225,6 +3226,34 @@ export async function runAgentLoopImpl( !abortController.signal.aborted && !yieldedForHandoff ) { + // Drop any reservation stranded by the failed LLM call before + // inserting the synthetic error message. The B3 pre-allocation + // path reserves an empty assistant row at `llm_call_started`; + // when the call exits through the provider-error branch (no + // `message_complete`), `assistantRowAwaitingFinalization` stays + // true. Without this delete the transcript would carry both the + // empty reserved row AND the error message — and downstream sync + // (`syncLastAssistantMessageToDisk`) would mis-target the empty + // row. After delete we set `lastAssistantMessageId` to the new + // error row's id so the post-loop emission paths still point at + // a real message. + if ( + state.assistantRowAwaitingFinalization && + state.lastAssistantMessageId + ) { + // Direct `deleteMessageById` (not via the `persistence` pipeline): + // see the same rationale on the matching cleanup in + // `handleLlmCallStarted` — an unfinalized reservation has no + // observable history for plugins. + try { + deleteMessageById(state.lastAssistantMessageId); + } catch (err) { + rlog.warn( + { err, messageId: state.lastAssistantMessageId }, + "Failed to clean up stranded reserved assistant row on provider-error path (non-fatal)", + ); + } + } const errChannelMeta = { ...provenanceFromTrustContext(ctx.trustContext), userMessageChannel: capturedTurnChannelContext.userMessageChannel, @@ -3252,6 +3281,15 @@ export async function runAgentLoopImpl( DEFAULT_TIMEOUTS.persistence, )) as PersistAddResult; persistedErrorAssistantMessage = true; + // Repoint `lastAssistantMessageId` at the synthetic error row so the + // post-loop sync, attachment resolution, and `message_complete`/ + // `generation_handoff` emissions all reference a real, persisted + // message id. The previous reservation (if any) was already deleted + // above. Mark finalization complete so the next LLM call in this run + // (or a downstream handler) doesn't try to clean up an id that + // already corresponds to a finalized row. + state.lastAssistantMessageId = errorPersistResult.message.id; + state.assistantRowAwaitingFinalization = false; newMessages.push(errorAssistantMessage); // Pipe the just-assigned message id into any orphaned LLM request log // row(s) for this turn. The success path links rows via diff --git a/assistant/src/daemon/wake-target-adapter.ts b/assistant/src/daemon/wake-target-adapter.ts index 5d5c7174e15..44f92541b86 100644 --- a/assistant/src/daemon/wake-target-adapter.ts +++ b/assistant/src/daemon/wake-target-adapter.ts @@ -139,6 +139,15 @@ function translateAgentEventToServerMessage( case "max_tokens_reached": case "agent_loop_exit": return null; + case "llm_call_started": + // The wake path persists its assistant tail via `persistTailMessage` + // (an `addMessage`-shaped call below) rather than via the main + // event-handler's `reserve` → `updateContent` pipeline, so there is + // no row to reserve here. Translation returns null and the wake + // path's existing end-of-turn persist continues to mint the row. + // Following up with full wake-path pre-allocation parity is tracked + // as a B3 follow-up. + return null; } }