diff --git a/assistant/src/__tests__/call-controller.test.ts b/assistant/src/__tests__/call-controller.test.ts index e1e445d4968..3ff227e6ff4 100644 --- a/assistant/src/__tests__/call-controller.test.ts +++ b/assistant/src/__tests__/call-controller.test.ts @@ -542,8 +542,10 @@ describe('call-controller', () => { opts.signal?.addEventListener('abort', () => { clearTimeout(timeout); - const err = new Error('aborted'); - err.name = 'AbortError'; + // In the real system, generation_cancelled triggers + // onComplete via the event sink. The AbortSignal listener + // in call-controller also resolves turnComplete defensively. + opts.onComplete(); resolve({ runId: 'run-1', abort: () => {} }); }, { once: true }); }); @@ -561,6 +563,39 @@ describe('call-controller', () => { controller.destroy(); }); + test('handleInterrupt: turnComplete settles even when event sink callbacks are not called', async () => { + // Simulate a turn that never calls onComplete or onError on abort — + // the defensive AbortSignal listener in runTurn() should settle the promise. + mockStartVoiceTurn.mockImplementation(async (opts: { signal?: AbortSignal; onTextDelta: (t: string) => void; onComplete: () => void }) => { + return new Promise((resolve) => { + const timeout = setTimeout(() => { + opts.onTextDelta('Long running turn'); + opts.onComplete(); + resolve({ runId: 'run-1', abort: () => {} }); + }, 5000); + + opts.signal?.addEventListener('abort', () => { + clearTimeout(timeout); + // Intentionally do NOT call onComplete — simulates the old + // broken path where generation_cancelled was not forwarded. + resolve({ runId: 'run-1', abort: () => {} }); + }, { once: true }); + }); + }); + + const { controller } = setupController(); + const turnPromise = controller.handleCallerUtterance('Start speaking'); + await new Promise((r) => setTimeout(r, 5)); + controller.handleInterrupt(); + + // Should not hang — the AbortSignal listener resolves the promise + await turnPromise; + + expect(controller.getState()).toBe('idle'); + + controller.destroy(); + }); + // ── destroy ─────────────────────────────────────────────────────── test('destroy: unregisters controller', () => { @@ -583,6 +618,48 @@ describe('call-controller', () => { expect(() => controller.destroy()).not.toThrow(); }); + test('destroy: during active turn does not trigger post-turn side effects', async () => { + // Simulate a turn that completes after destroy() is called + mockStartVoiceTurn.mockImplementation(async (opts: { signal?: AbortSignal; onTextDelta: (t: string) => void; onComplete: () => void }) => { + return new Promise((resolve) => { + const timeout = setTimeout(() => { + opts.onTextDelta('This is a long response'); + opts.onComplete(); + resolve({ runId: 'run-1', abort: () => {} }); + }, 1000); + + opts.signal?.addEventListener('abort', () => { + clearTimeout(timeout); + // The defensive abort listener in runTurn resolves turnComplete + opts.onComplete(); + resolve({ runId: 'run-1', abort: () => {} }); + }, { once: true }); + }); + }); + + const { relay, controller } = setupController(); + const turnPromise = controller.handleCallerUtterance('Start speaking'); + + // Let the turn start + await new Promise((r) => setTimeout(r, 5)); + + // Destroy the controller while the turn is active + controller.destroy(); + + // Wait for the turn to settle + await turnPromise; + + // Verify that NO spurious post-turn side effects occurred after destroy: + // - No final empty-string sendTextToken('', true) call after abort + // The only end marker should be from handleInterrupt, not from post-turn logic + const endMarkers = relay.sentTokens.filter((t) => t.token === '' && t.last === true); + + // destroy() increments llmRunVersion, so isCurrentRun() returns false + // for the aborted turn, preventing post-turn side effects including + // the spurious relay.sendTextToken('', true) on line 418. + expect(endMarkers.length).toBe(0); + }); + // ── handleUserInstruction ───────────────────────────────────────── test('handleUserInstruction: injects instruction marker and triggers turn when idle', async () => { diff --git a/assistant/src/__tests__/run-orchestrator.test.ts b/assistant/src/__tests__/run-orchestrator.test.ts index 76ec5775f1b..6eb09a20c4b 100644 --- a/assistant/src/__tests__/run-orchestrator.test.ts +++ b/assistant/src/__tests__/run-orchestrator.test.ts @@ -570,6 +570,84 @@ describe('eventSink forwarding', () => { expect(receivedTools[0].input).toEqual({ query: 'test' }); }); + test('eventSink receives onMessageComplete on generation_cancelled', async () => { + const conversation = createConversation('event sink cancelled test'); + const cancelledMsg: ServerMessage = { + type: 'generation_cancelled', + sessionId: conversation.id, + }; + const session = makeSessionWithEvent(cancelledMsg); + + let messageCompleteCount = 0; + const receivedErrors: string[] = []; + const sink: VoiceRunEventSink = { + onTextDelta: () => {}, + onMessageComplete: () => { messageCompleteCount++; }, + onError: (msg) => receivedErrors.push(msg), + onToolUse: () => {}, + }; + + const orchestrator = new RunOrchestrator({ + getOrCreateSession: async () => session, + resolveAttachments: () => [], + deriveDefaultStrictSideEffects: () => false, + }); + + await orchestrator.startRun(conversation.id, 'Hello', undefined, { + eventSink: sink, + }); + await new Promise((r) => setTimeout(r, 50)); + + // generation_cancelled should be forwarded as onMessageComplete + expect(messageCompleteCount).toBe(1); + // It should NOT trigger onError + expect(receivedErrors).toHaveLength(0); + }); + + test('eventSink receives onError when runAgentLoop throws', async () => { + const conversation = createConversation('event sink exception test'); + + // Build a session whose runAgentLoop throws an exception instead of + // emitting events — simulating an unhandled crash in the agent loop. + const session = { + isProcessing: () => false, + persistUserMessage: () => undefined as unknown as string, + memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false }, + setChannelCapabilities: () => {}, + setAssistantId: () => {}, + setGuardianContext: () => {}, + setCommandIntent: () => {}, + setTurnChannelContext: () => {}, + updateClient: () => {}, + runAgentLoop: async () => { + throw new Error('Unexpected agent crash'); + }, + handleConfirmationResponse: () => {}, + } as unknown as Session; + + const receivedErrors: string[] = []; + const sink: VoiceRunEventSink = { + onTextDelta: () => {}, + onMessageComplete: () => {}, + onError: (msg) => receivedErrors.push(msg), + onToolUse: () => {}, + }; + + const orchestrator = new RunOrchestrator({ + getOrCreateSession: async () => session, + resolveAttachments: () => [], + deriveDefaultStrictSideEffects: () => false, + }); + + await orchestrator.startRun(conversation.id, 'Hello', undefined, { + eventSink: sink, + }); + await new Promise((r) => setTimeout(r, 50)); + + // The exception message should be forwarded to the event sink + expect(receivedErrors).toEqual(['Unexpected agent crash']); + }); + test('no events forwarded when eventSink is not provided', async () => { const conversation = createConversation('no sink test'); const deltaMsg: ServerMessage = { diff --git a/assistant/src/calls/call-controller.ts b/assistant/src/calls/call-controller.ts index 163959703f3..760a60ff004 100644 --- a/assistant/src/calls/call-controller.ts +++ b/assistant/src/calls/call-controller.ts @@ -263,6 +263,7 @@ export class CallController { if (this.durationWarningTimer) clearTimeout(this.durationWarningTimer); if (this.consultationTimer) clearTimeout(this.consultationTimer); if (this.durationEndTimer) { clearTimeout(this.durationEndTimer); this.durationEndTimer = null; } + this.llmRunVersion++; this.abortCurrentTurn(); unregisterCallController(this.callSessionId); log.info({ callSessionId: this.callSessionId }, 'CallController destroyed'); @@ -398,6 +399,11 @@ export class CallController { }).catch((err) => { reject(err); }); + + // Defensive: if the turn is aborted (e.g. barge-in) and the event + // sink callbacks are never invoked, resolve the promise so it + // doesn't hang forever. + runSignal.addEventListener('abort', () => { resolve(); }, { once: true }); }); await turnComplete; diff --git a/assistant/src/runtime/run-orchestrator.ts b/assistant/src/runtime/run-orchestrator.ts index 39a6a6907b4..ef838e1bd06 100644 --- a/assistant/src/runtime/run-orchestrator.ts +++ b/assistant/src/runtime/run-orchestrator.ts @@ -312,6 +312,10 @@ export class RunOrchestrator { eventSink.onTextDelta(msg.text); } else if (msg.type === 'message_complete') { eventSink.onMessageComplete(); + } else if (msg.type === 'generation_cancelled') { + // Treat cancellation as a completed turn so the voice + // turnComplete promise settles instead of hanging forever. + eventSink.onMessageComplete(); } else if (msg.type === 'error') { eventSink.onError(msg.message); } else if (msg.type === 'session_error') { @@ -331,6 +335,11 @@ export class RunOrchestrator { const message = err instanceof Error ? err.message : String(err); log.error({ err, runId: run.id }, 'Run failed'); runsStore.failRun(run.id, message); + // Notify the voice event sink so the caller's turnComplete + // promise settles instead of hanging on unhandled exceptions. + if (eventSink) { + eventSink.onError(message); + } } finally { cleanup(); }