diff --git a/assistant/src/daemon/server.ts b/assistant/src/daemon/server.ts index 5b5315c9a00..972b1ce616b 100644 --- a/assistant/src/daemon/server.ts +++ b/assistant/src/daemon/server.ts @@ -7,6 +7,7 @@ import { setBroadcastToAllClients, } from "../acp/index.js"; import { enrichMessageWithSourcePaths } from "../agent/attachments.js"; +import type { AgentEvent } from "../agent/loop.js"; import { createAssistantMessage, createUserMessage, @@ -1673,6 +1674,125 @@ function extractConversationId(msg: ServerMessage): string | undefined { return undefined; } +/** + * Translate a raw {@link AgentEvent} from the agent loop into the + * corresponding {@link ServerMessage} wire frame. The normal user-turn + * path does this via the full state-aware handler in + * `conversation-agent-loop-handlers.ts`; the wake path has no tool + * accounting, title generation, or activity-state tracking to worry + * about, so we only need the subset that produces client-visible + * frames. Events that have no client-visible wire shape (usage, error, + * preview/input-json deltas, etc.) are dropped — they produce no UI. + * + * Keeping this translator co-located with the wake adapter preserves + * the runtime/daemon layering: `runtime/agent-wake.ts` never imports + * `message-protocol.ts` or wire shapes, and the daemon owns all + * translation from agent-loop semantics to client frames. + */ +function translateAgentEventToServerMessage( + event: AgentEvent, + conversationId: string, +): ServerMessage | null { + switch (event.type) { + case "text_delta": + return { + type: "assistant_text_delta", + text: event.text, + conversationId, + }; + case "thinking_delta": + return { + type: "assistant_thinking_delta", + thinking: event.thinking, + conversationId, + }; + case "tool_use": + return { + type: "tool_use_start", + toolName: event.name, + input: event.input, + conversationId, + toolUseId: event.id, + }; + case "tool_use_preview_start": + return { + type: "tool_use_preview_start", + toolUseId: event.toolUseId, + toolName: event.toolName, + conversationId, + }; + case "tool_output_chunk": + return { + type: "tool_output_chunk", + chunk: event.chunk, + conversationId, + toolUseId: event.toolUseId, + }; + case "tool_result": { + const imageBlocks = event.contentBlocks?.filter( + (b): b is Extract => b.type === "image", + ); + const imageDataList = imageBlocks?.length + ? imageBlocks.map((b) => b.source.data) + : undefined; + return { + type: "tool_result", + toolName: "", + result: event.content, + isError: event.isError, + diff: event.diff, + status: event.status, + conversationId, + imageData: imageDataList?.[0], + imageDataList, + toolUseId: event.toolUseId, + }; + } + case "server_tool_start": + return { + type: "tool_use_start", + toolName: event.name, + input: event.input, + conversationId, + toolUseId: event.toolUseId, + }; + case "server_tool_complete": { + let resultText = ""; + if (Array.isArray(event.content) && event.content.length > 0) { + resultText = (event.content as unknown[]) + .filter( + (r): r is { type: string; title: string; url: string } => + typeof r === "object" && + r != null && + (r as { type?: string }).type === "web_search_result", + ) + .map((r) => `${r.title}\n${r.url}`) + .join("\n\n"); + } + return { + type: "tool_result", + toolName: "web_search", + result: resultText, + isError: event.isError, + conversationId, + toolUseId: event.toolUseId, + }; + } + case "message_complete": + return { + type: "message_complete", + conversationId, + }; + // No wire frame for these — usage/error/input_json_delta are either + // server-internal (accounting/classification) or app-only debug + // streams the client doesn't surface for wake-originated turns. + case "input_json_delta": + case "usage": + case "error": + return null; + } +} + /** * Adapt a live {@link Conversation} to the narrow {@link WakeTarget} * surface expected by `wakeAgentForOpportunity()`. Kept here so the @@ -1687,7 +1807,16 @@ function conversationToWakeTarget(conversation: Conversation): WakeTarget { pushMessage: (msg) => { conversation.messages.push(msg); }, - emitToClient: (msg) => conversation.sendToClient(msg), + emitAgentEvent: (event) => { + const frame = translateAgentEventToServerMessage( + event, + conversation.conversationId, + ); + if (frame) conversation.sendToClient(frame); + }, isProcessing: () => conversation.isProcessing(), + markProcessing: (on) => { + conversation.processing = on; + }, }; } diff --git a/assistant/src/runtime/__tests__/agent-wake.test.ts b/assistant/src/runtime/__tests__/agent-wake.test.ts index 5801c81f8fc..7672a55ddc6 100644 --- a/assistant/src/runtime/__tests__/agent-wake.test.ts +++ b/assistant/src/runtime/__tests__/agent-wake.test.ts @@ -4,8 +4,8 @@ * * Exercise strategy: the wake helper takes a `resolveTarget` dependency so * these tests stub out the heavyweight `Conversation` class with a minimal - * `WakeTarget` that just tracks buffered messages, emitted events, and a - * scripted `agentLoop.run()` response. + * `WakeTarget` that just tracks agent-event forwards, buffered messages, + * and a scripted `agentLoop.run()` response. * * The `addMessage` import from `memory/conversation-crud.ts` is stubbed * via `mock.module()` so we can assert on persistence without touching a @@ -46,21 +46,25 @@ import { // ── Test helpers ───────────────────────────────────────────────────── interface MockTarget extends WakeTarget { - emittedEvents: unknown[]; + emittedEvents: AgentEvent[]; pushedMessages: Message[]; runCalls: Array<{ input: Message[]; requestId?: string }>; + processingToggles: boolean[]; } function makeTarget(options: { conversationId?: string; baseline?: Message[]; scriptedAssistant?: Message | null; + /** Extra tail messages appended *after* `scriptedAssistant` (e.g. tool_result, follow-up assistant). */ + scriptedTail?: Message[]; scriptedEvents?: AgentEvent[]; isProcessing?: boolean; }): MockTarget { - const emittedEvents: unknown[] = []; + const emittedEvents: AgentEvent[] = []; const pushedMessages: Message[] = []; const runCalls: Array<{ input: Message[]; requestId?: string }> = []; + const processingToggles: boolean[] = []; const history: Message[] = [...(options.baseline ?? [])]; let processing = options.isProcessing ?? false; @@ -69,6 +73,7 @@ function makeTarget(options: { emittedEvents, pushedMessages, runCalls, + processingToggles, agentLoop: { run: async ( input: Message[], @@ -81,7 +86,7 @@ function makeTarget(options: { for (const ev of options.scriptedEvents ?? []) { await onEvent(ev); } - // Final history = input + optional assistant message. + // Final history = input + optional assistant message + optional tail. const next = [...input]; if (options.scriptedAssistant) { next.push(options.scriptedAssistant); @@ -90,6 +95,11 @@ function makeTarget(options: { message: options.scriptedAssistant, }); } + if (options.scriptedTail) { + for (const tailMsg of options.scriptedTail) { + next.push(tailMsg); + } + } return next; }, }, @@ -98,13 +108,18 @@ function makeTarget(options: { pushedMessages.push(msg); history.push(msg); }, - emitToClient: (msg) => { - emittedEvents.push(msg); + emitAgentEvent: (event) => { + emittedEvents.push(event); }, isProcessing: () => processing, + markProcessing: (on: boolean) => { + processing = on; + processingToggles.push(on); + }, }; - // Expose processing setter via test-only side-channel + // Expose processing setter via test-only side-channel for tests that + // simulate an external (non-wake) processing state. (target as unknown as { setProcessing: (v: boolean) => void }).setProcessing = (v: boolean) => { processing = v; @@ -200,14 +215,152 @@ describe("wakeAgentForOpportunity", () => { ); // Assistant message pushed into live history. expect(target.pushedMessages).toContainEqual(assistantMessage); - // message_complete event flushed to the client. + // message_complete event flushed to the client via the translator + // surface (raw AgentEvent — adapter is responsible for wire shape). const flushed = target.emittedEvents.find( - (e) => - typeof e === "object" && e !== null && (e as { type?: string }).type === "message_complete", + (e) => e.type === "message_complete", ); expect(flushed).toBeDefined(); }); + test("persists full multi-turn tail (assistant → tool_result → follow-up assistant)", async () => { + // Simulate a wake that produces a tool_use, an executed tool_result + // user message, and a follow-up assistant summary. All three must be + // persisted; otherwise the next rehydration loses the tool_result + // and the provider rejects the orphaned tool_use. + const firstAssistant: Message = { + role: "assistant", + content: [ + { + type: "tool_use", + id: "tu-1", + name: "meet_send_chat", + input: { text: "Sure" }, + }, + ], + }; + const toolResultUserMsg: Message = { + role: "user", + content: [ + { + type: "tool_result", + tool_use_id: "tu-1", + content: "sent", + }, + ], + }; + const followupAssistant: Message = { + role: "assistant", + content: [{ type: "text", text: "Done." }], + }; + + const target = makeTarget({ + baseline: [{ role: "user", content: [{ type: "text", text: "hi" }] }], + scriptedAssistant: firstAssistant, + scriptedTail: [toolResultUserMsg, followupAssistant], + }); + + const result = await wakeAgentForOpportunity( + { + conversationId: target.conversationId, + hint: "question directed at assistant", + source: "meet-chat-opportunity", + }, + { resolveTarget: async () => target }, + ); + + expect(result).toEqual({ invoked: true, producedToolCalls: true }); + + // All three tail messages persisted in order. + expect(persistedMessages).toHaveLength(3); + expect(persistedMessages[0]).toMatchObject({ role: "assistant" }); + expect(JSON.parse(persistedMessages[0]!.content)).toEqual( + firstAssistant.content, + ); + expect(persistedMessages[1]).toMatchObject({ role: "user" }); + expect(JSON.parse(persistedMessages[1]!.content)).toEqual( + toolResultUserMsg.content, + ); + expect(persistedMessages[2]).toMatchObject({ role: "assistant" }); + expect(JSON.parse(persistedMessages[2]!.content)).toEqual( + followupAssistant.content, + ); + + // All three also pushed into live history so next turn sees them. + expect(target.pushedMessages).toHaveLength(3); + expect(target.pushedMessages[0]).toEqual(firstAssistant); + expect(target.pushedMessages[1]).toEqual(toolResultUserMsg); + expect(target.pushedMessages[2]).toEqual(followupAssistant); + }); + + test("marks processing true during the run and false afterwards", async () => { + const target = makeTarget({ + baseline: [{ role: "user", content: [{ type: "text", text: "hi" }] }], + scriptedAssistant: { + role: "assistant", + content: [{ type: "text", text: "reply" }], + }, + }); + + // Snapshot isProcessing() inside the run to prove we actually + // hold the processing flag while agentLoop.run executes. + const observedDuringRun: boolean[] = []; + const originalRun = target.agentLoop.run; + target.agentLoop.run = async (input, onEvent, signal, requestId) => { + observedDuringRun.push(target.isProcessing()); + return originalRun(input, onEvent, signal, requestId); + }; + + await wakeAgentForOpportunity( + { + conversationId: target.conversationId, + hint: "x", + source: "unit-test", + }, + { resolveTarget: async () => target }, + ); + + // markProcessing toggled on then off exactly once. + expect(target.processingToggles).toEqual([true, false]); + // And the flag was observed as true inside the run body. + expect(observedDuringRun).toEqual([true]); + // Back to idle by the time the wake returns. + expect(target.isProcessing()).toBe(false); + }); + + test("marks processing false even when the agent loop throws", async () => { + const history: Message[] = []; + const toggles: boolean[] = []; + let processing = false; + const target: WakeTarget = { + conversationId: "conv-err-guard", + agentLoop: { + run: async () => { + throw new Error("LLM exploded"); + }, + }, + getMessages: () => history, + pushMessage: () => {}, + emitAgentEvent: () => {}, + isProcessing: () => processing, + markProcessing: (on) => { + processing = on; + toggles.push(on); + }, + }; + + const result = await wakeAgentForOpportunity( + { conversationId: "conv-err-guard", hint: "boom", source: "t" }, + { resolveTarget: async () => target }, + ); + + expect(result).toEqual({ invoked: true, producedToolCalls: false }); + // Critical: the finally block must have released the flag despite + // the thrown error, otherwise the next user turn would hang. + expect(toggles).toEqual([true, false]); + expect(processing).toBe(false); + }); + test("two concurrent wakes on the same conversation are serialized", async () => { // Build a target whose agentLoop.run resolves only when we signal. const gate1 = Promise.withResolvers(); @@ -217,6 +370,7 @@ describe("wakeAgentForOpportunity", () => { let callIndex = 0; const history: Message[] = []; + let processing = false; const target: WakeTarget = { conversationId: "conv-serialize", agentLoop: { @@ -236,8 +390,11 @@ describe("wakeAgentForOpportunity", () => { pushMessage: (msg) => { history.push(msg); }, - emitToClient: () => {}, - isProcessing: () => false, + emitAgentEvent: () => {}, + isProcessing: () => processing, + markProcessing: (on) => { + processing = on; + }, }; const deps = { resolveTarget: async () => target }; @@ -277,8 +434,14 @@ describe("wakeAgentForOpportunity", () => { }, getMessages: () => history, pushMessage: (msg) => history.push(msg), - emitToClient: () => {}, + emitAgentEvent: () => {}, isProcessing: () => processing, + // The wake's own markProcessing updates track the flag too — the + // outer "user turn" holds it at true until setProcessing(false) + // is called below. + markProcessing: (on) => { + processing = on; + }, setProcessing: (v) => { processing = v; }, @@ -321,6 +484,7 @@ describe("wakeAgentForOpportunity", () => { test("agent loop error is treated as a no-op", async () => { const history: Message[] = []; + let processing = false; const target: WakeTarget = { conversationId: "conv-err", agentLoop: { @@ -330,8 +494,11 @@ describe("wakeAgentForOpportunity", () => { }, getMessages: () => history, pushMessage: () => {}, - emitToClient: () => {}, - isProcessing: () => false, + emitAgentEvent: () => {}, + isProcessing: () => processing, + markProcessing: (on) => { + processing = on; + }, }; const result = await wakeAgentForOpportunity( diff --git a/assistant/src/runtime/agent-wake.ts b/assistant/src/runtime/agent-wake.ts index 55432cfe4d6..72972c21660 100644 --- a/assistant/src/runtime/agent-wake.ts +++ b/assistant/src/runtime/agent-wake.ts @@ -21,6 +21,10 @@ * - If a user turn (or another wake) is currently in flight on the same * conversation, the wake is queued behind it (single-flight per * `conversationId`). + * - While the wake's agent loop is running, the conversation is marked + * as processing (via {@link WakeTarget.markProcessing}) so a user send + * that arrives mid-wake is queued by `enqueueMessage` instead of + * launching a concurrent `agentLoop.run()` on the same conversation. * * Logging: * - Emits one structured log line per wake: @@ -33,7 +37,6 @@ */ import type { AgentEvent, AgentLoop } from "../agent/loop.js"; -import type { ServerMessage } from "../daemon/message-protocol.js"; import { addMessage } from "../memory/conversation-crud.js"; import type { Message } from "../providers/types.js"; import { getLogger } from "../util/logger.js"; @@ -45,6 +48,15 @@ const log = getLogger("agent-wake"); * interface rather than importing `Conversation` directly so the wake * helper stays decoupled from the heavyweight conversation class and is * easy to exercise under unit tests. + * + * Translation note: the wake deliberately hands the adapter a raw + * {@link AgentEvent} via {@link emitAgentEvent} rather than a + * `ServerMessage`. The normal user-turn path translates `AgentEvent` into + * the correctly-shaped wire protocol frames (e.g. + * `text_delta` → `assistant_text_delta` with `conversationId`) via the + * canonical handler in `conversation-agent-loop-handlers.ts`. Passing raw + * events means the adapter can reuse that translation rather than the + * wake helper shipping malformed frames. */ export interface WakeTarget { readonly conversationId: string; @@ -52,14 +64,29 @@ export interface WakeTarget { /** * Live LLM-visible history. We read a snapshot, append the internal hint * for the run, and then (on non-empty output) append the resulting - * assistant message to this array so subsequent turns see it. + * assistant message(s) to this array so subsequent turns see them. */ getMessages(): Message[]; pushMessage(message: Message): void; - /** Client emitter — e.g. SSE. We only call this when the wake produces output. */ - emitToClient(msg: ServerMessage): void; + /** + * Forward a raw agent event so the adapter can translate it to the + * correct `ServerMessage` shape (e.g. stamping `conversationId`, + * renaming `text_delta` → `assistant_text_delta`) before emission. + * + * Only called when the wake produces output worth emitting — silent + * no-op wakes never flush buffered events. + */ + emitAgentEvent(event: AgentEvent): void; /** True if the conversation is already processing a turn. */ isProcessing(): boolean; + /** + * Toggle the conversation's in-flight processing marker. The wake + * wraps its `agentLoop.run()` invocation in + * `markProcessing(true) … markProcessing(false)` so a concurrent user + * send sees `isProcessing() === true` and queues the message instead + * of spawning a parallel agent loop. + */ + markProcessing(on: boolean): void; } export interface WakeOptions { @@ -182,44 +209,49 @@ async function waitUntilIdle( } /** - * Inspect the final assistant message of the post-run history to decide - * whether the wake produced output worth persisting/emitting. + * Inspect the post-run history slice to decide whether the wake produced + * output worth persisting/emitting, and collect any tool-use names from + * the *first* assistant reply (used only for logging). */ -function inspectAssistantOutput( +function inspectWakeOutput( baselineLength: number, updatedHistory: Message[], ): { - assistantMessage: Message | null; + tailMessages: Message[]; hasVisibleText: boolean; toolUseNames: string[]; } { // The agent loop appends assistant messages (and tool_result user // messages) onto the history it was given. We gave it baseline + // internal hint, so anything at index >= baselineLength + 1 came from - // the run. The *first* message past the hint is the assistant reply. + // the run. const firstAssistantIndex = baselineLength + 1; if (updatedHistory.length <= firstAssistantIndex) { - return { assistantMessage: null, hasVisibleText: false, toolUseNames: [] }; - } - const assistantMessage = updatedHistory[firstAssistantIndex]; - if (!assistantMessage || assistantMessage.role !== "assistant") { - return { assistantMessage: null, hasVisibleText: false, toolUseNames: [] }; + return { tailMessages: [], hasVisibleText: false, toolUseNames: [] }; } - const blocks = Array.isArray(assistantMessage.content) - ? assistantMessage.content - : []; + const tailMessages = updatedHistory.slice(firstAssistantIndex); + + // Scan every tail message for visible text or tool_use blocks. A + // multi-step run (assistant → tool_result → assistant) still counts as + // "produced output" when the final assistant message is just a summary + // — we must persist the entire tail so the DB mirrors in-memory + // history. let hasVisibleText = false; const toolUseNames: string[] = []; - for (const block of blocks) { - if (block.type === "text" && typeof block.text === "string") { - if (block.text.trim().length > 0) { - hasVisibleText = true; + for (const msg of tailMessages) { + if (msg.role !== "assistant") continue; + const blocks = Array.isArray(msg.content) ? msg.content : []; + for (const block of blocks) { + if (block.type === "text" && typeof block.text === "string") { + if (block.text.trim().length > 0) { + hasVisibleText = true; + } + } else if (block.type === "tool_use") { + toolUseNames.push(block.name); } - } else if (block.type === "tool_use") { - toolUseNames.push(block.name); } } - return { assistantMessage, hasVisibleText, toolUseNames }; + return { tailMessages, hasVisibleText, toolUseNames }; } /** @@ -271,19 +303,21 @@ export async function wakeAgentForOpportunity( // Buffer events during the run. If the agent produces no visible // output and no tool calls, we drop everything silently. If it does, - // we flush the buffer to the client so the client sees normal - // streaming events (deltas, tool_use, tool_result, etc.). - const buffered: ServerMessage[] = []; + // we flush the buffered events via the target's translation-aware + // emitter so clients receive correctly-shaped wire frames (e.g. + // `assistant_text_delta` with `conversationId`, not the raw + // `text_delta` variant of `AgentEvent`). + const buffered: AgentEvent[] = []; const onEvent = (event: AgentEvent): void => { - // AgentEvent and ServerMessage share several variants by shape. - // The conversation's runtime normally translates AgentEvent into - // client events via a richer handler; for wake, we buffer the raw - // event types and forward only those that are directly - // client-safe. Unknown types are dropped (they produce no UI). - const ev = event as unknown as ServerMessage; - buffered.push(ev); + buffered.push(event); }; + // Mark processing for the duration of the run so a concurrent user + // send is queued by `enqueueMessage()` rather than spawning a second + // concurrent agent loop on the same conversation (which would + // interleave writes to `conversation.messages`). + target.markProcessing(true); + let updatedHistory: Message[]; let runError: Error | null = null; try { @@ -296,6 +330,18 @@ export async function wakeAgentForOpportunity( } catch (err) { runError = err instanceof Error ? err : new Error(String(err)); updatedHistory = runInput; + } finally { + // Release the processing marker regardless of success/failure so + // the next user turn (or wake) isn't blocked waiting on a stale + // flag. + try { + target.markProcessing(false); + } catch (err) { + log.warn( + { conversationId, source, err }, + "agent-wake: markProcessing(false) threw; continuing", + ); + } } const durationMs = nowFn() - startedAt; @@ -307,13 +353,15 @@ export async function wakeAgentForOpportunity( return { invoked: true, producedToolCalls: false }; } - const { assistantMessage, hasVisibleText, toolUseNames } = - inspectAssistantOutput(baseline.length, updatedHistory); + const { tailMessages, hasVisibleText, toolUseNames } = inspectWakeOutput( + baseline.length, + updatedHistory, + ); const producedToolCalls = toolUseNames.length > 0; const producedOutput = producedToolCalls || hasVisibleText; - if (!producedOutput || !assistantMessage) { + if (!producedOutput || tailMessages.length === 0) { log.info( { source, @@ -327,13 +375,14 @@ export async function wakeAgentForOpportunity( return { invoked: true, producedToolCalls: false }; } - // Output produced: flush buffered client events and persist the - // assistant message so the transcript stays consistent. The internal - // hint is NOT persisted and NOT emitted — only the assistant reply - // (and any downstream tool-result user messages) is. + // Output produced: flush buffered client events through the target's + // translator and persist the full tail (assistant messages + any + // intervening tool_result user messages) so the DB mirrors the + // in-memory history. The internal hint is NOT persisted and NOT + // emitted. for (const event of buffered) { try { - target.emitToClient(event); + target.emitAgentEvent(event); } catch (err) { log.warn( { conversationId, source, err }, @@ -342,24 +391,30 @@ export async function wakeAgentForOpportunity( } } - // Append assistant message and any subsequent tool_result user - // messages to live history. The hint itself stays out of history. - for (let i = baseline.length + 1; i < updatedHistory.length; i++) { - const msg = updatedHistory[i]; - if (msg) target.pushMessage(msg); + // Append every tail message to live in-memory history. Without this, + // the next turn would rebuild from DB and lose the live references. + for (const msg of tailMessages) { + target.pushMessage(msg); } - try { - await addMessage( - conversationId, - assistantMessage.role, - JSON.stringify(assistantMessage.content), - ); - } catch (err) { - log.warn( - { conversationId, source, err }, - "agent-wake: failed to persist assistant message", - ); + // Persist every tail message (assistant outputs + tool_result user + // messages from the loop's own tool execution). If we only persisted + // the first assistant message, a rehydration from DB would have a + // `tool_use` with no matching `tool_result`, which the provider + // would reject on the next turn. + for (const msg of tailMessages) { + try { + await addMessage( + conversationId, + msg.role, + JSON.stringify(msg.content), + ); + } catch (err) { + log.warn( + { conversationId, source, err, role: msg.role }, + "agent-wake: failed to persist wake-tail message", + ); + } } log.info( @@ -369,6 +424,7 @@ export async function wakeAgentForOpportunity( durationMs, producedToolCalls, toolNamesCalled: toolUseNames, + tailMessageCount: tailMessages.length, }, "agent-wake: produced output", ); diff --git a/skills/meet-join/daemon/__tests__/proactive-chat-e2e.test.ts b/skills/meet-join/daemon/__tests__/proactive-chat-e2e.test.ts index 7229cb903ef..e8f2ba74c33 100644 --- a/skills/meet-join/daemon/__tests__/proactive-chat-e2e.test.ts +++ b/skills/meet-join/daemon/__tests__/proactive-chat-e2e.test.ts @@ -470,8 +470,9 @@ describe("proactive-chat E2E — Tier 1 hit → Tier 2 confirms → agent wake agentLoop: mockAgent.loop, getMessages: () => history, pushMessage: (msg) => history.push(msg), - emitToClient: () => {}, + emitAgentEvent: () => {}, isProcessing: () => false, + markProcessing: () => {}, }; // Opportunity callback → real agent wake. We await the wake @@ -583,8 +584,9 @@ describe("proactive-chat E2E — Tier 1 hit → Tier 2 confirms → agent wake agentLoop: mockAgent.loop, getMessages: () => history, pushMessage: (msg) => history.push(msg), - emitToClient: () => {}, + emitAgentEvent: () => {}, isProcessing: () => false, + markProcessing: () => {}, }; const wakeSpy = mock(async () => { @@ -666,8 +668,9 @@ describe("proactive-chat E2E — Tier 1 hit → Tier 2 confirms → agent wake agentLoop: mockAgent.loop, getMessages: () => history, pushMessage: (msg) => history.push(msg), - emitToClient: () => {}, + emitAgentEvent: () => {}, isProcessing: () => false, + markProcessing: () => {}, }; const wakePromises: Array< @@ -759,8 +762,9 @@ describe("proactive-chat E2E — Tier 1 hit → Tier 2 confirms → agent wake agentLoop: mockAgent.loop, getMessages: () => history, pushMessage: (msg) => history.push(msg), - emitToClient: () => {}, + emitAgentEvent: () => {}, isProcessing: () => false, + markProcessing: () => {}, }; const wakePromises: Array> = [];