Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 79 additions & 2 deletions assistant/src/__tests__/call-controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -542,8 +542,10 @@ describe('call-controller', () => {

opts.signal?.addEventListener('abort', () => {
clearTimeout(timeout);
const err = new Error('aborted');
err.name = 'AbortError';
// In the real system, generation_cancelled triggers
// onComplete via the event sink. The AbortSignal listener
// in call-controller also resolves turnComplete defensively.
opts.onComplete();
resolve({ runId: 'run-1', abort: () => {} });
}, { once: true });
});
Expand All @@ -561,6 +563,39 @@ describe('call-controller', () => {
controller.destroy();
});

test('handleInterrupt: turnComplete settles even when event sink callbacks are not called', async () => {
// Simulate a turn that never calls onComplete or onError on abort —
// the defensive AbortSignal listener in runTurn() should settle the promise.
mockStartVoiceTurn.mockImplementation(async (opts: { signal?: AbortSignal; onTextDelta: (t: string) => void; onComplete: () => void }) => {
return new Promise((resolve) => {
const timeout = setTimeout(() => {
opts.onTextDelta('Long running turn');
opts.onComplete();
resolve({ runId: 'run-1', abort: () => {} });
}, 5000);

opts.signal?.addEventListener('abort', () => {
clearTimeout(timeout);
// Intentionally do NOT call onComplete — simulates the old
// broken path where generation_cancelled was not forwarded.
resolve({ runId: 'run-1', abort: () => {} });
}, { once: true });
});
});

const { controller } = setupController();
const turnPromise = controller.handleCallerUtterance('Start speaking');
await new Promise((r) => setTimeout(r, 5));
controller.handleInterrupt();

// Should not hang — the AbortSignal listener resolves the promise
await turnPromise;

expect(controller.getState()).toBe('idle');

controller.destroy();
});

// ── destroy ───────────────────────────────────────────────────────

test('destroy: unregisters controller', () => {
Expand All @@ -583,6 +618,48 @@ describe('call-controller', () => {
expect(() => controller.destroy()).not.toThrow();
});

test('destroy: during active turn does not trigger post-turn side effects', async () => {
// Simulate a turn that completes after destroy() is called
mockStartVoiceTurn.mockImplementation(async (opts: { signal?: AbortSignal; onTextDelta: (t: string) => void; onComplete: () => void }) => {
return new Promise((resolve) => {
const timeout = setTimeout(() => {
opts.onTextDelta('This is a long response');
opts.onComplete();
resolve({ runId: 'run-1', abort: () => {} });
}, 1000);

opts.signal?.addEventListener('abort', () => {
clearTimeout(timeout);
// The defensive abort listener in runTurn resolves turnComplete
opts.onComplete();
resolve({ runId: 'run-1', abort: () => {} });
}, { once: true });
});
});

const { relay, controller } = setupController();
const turnPromise = controller.handleCallerUtterance('Start speaking');

// Let the turn start
await new Promise((r) => setTimeout(r, 5));

// Destroy the controller while the turn is active
controller.destroy();

// Wait for the turn to settle
await turnPromise;

// Verify that NO spurious post-turn side effects occurred after destroy:
// - No final empty-string sendTextToken('', true) call after abort
// The only end marker should be from handleInterrupt, not from post-turn logic
const endMarkers = relay.sentTokens.filter((t) => t.token === '' && t.last === true);

// destroy() increments llmRunVersion, so isCurrentRun() returns false
// for the aborted turn, preventing post-turn side effects including
// the spurious relay.sendTextToken('', true) on line 418.
expect(endMarkers.length).toBe(0);
});

// ── handleUserInstruction ─────────────────────────────────────────

test('handleUserInstruction: injects instruction marker and triggers turn when idle', async () => {
Expand Down
78 changes: 78 additions & 0 deletions assistant/src/__tests__/run-orchestrator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,84 @@ describe('eventSink forwarding', () => {
expect(receivedTools[0].input).toEqual({ query: 'test' });
});

test('eventSink receives onMessageComplete on generation_cancelled', async () => {
const conversation = createConversation('event sink cancelled test');
const cancelledMsg: ServerMessage = {
type: 'generation_cancelled',
sessionId: conversation.id,
};
const session = makeSessionWithEvent(cancelledMsg);

let messageCompleteCount = 0;
const receivedErrors: string[] = [];
const sink: VoiceRunEventSink = {
onTextDelta: () => {},
onMessageComplete: () => { messageCompleteCount++; },
onError: (msg) => receivedErrors.push(msg),
onToolUse: () => {},
};

const orchestrator = new RunOrchestrator({
getOrCreateSession: async () => session,
resolveAttachments: () => [],
deriveDefaultStrictSideEffects: () => false,
});

await orchestrator.startRun(conversation.id, 'Hello', undefined, {
eventSink: sink,
});
await new Promise((r) => setTimeout(r, 50));

// generation_cancelled should be forwarded as onMessageComplete
expect(messageCompleteCount).toBe(1);
// It should NOT trigger onError
expect(receivedErrors).toHaveLength(0);
});

test('eventSink receives onError when runAgentLoop throws', async () => {
const conversation = createConversation('event sink exception test');

// Build a session whose runAgentLoop throws an exception instead of
// emitting events — simulating an unhandled crash in the agent loop.
const session = {
isProcessing: () => false,
persistUserMessage: () => undefined as unknown as string,
memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false },
setChannelCapabilities: () => {},
setAssistantId: () => {},
setGuardianContext: () => {},
setCommandIntent: () => {},
setTurnChannelContext: () => {},
updateClient: () => {},
runAgentLoop: async () => {
throw new Error('Unexpected agent crash');
},
handleConfirmationResponse: () => {},
} as unknown as Session;

const receivedErrors: string[] = [];
const sink: VoiceRunEventSink = {
onTextDelta: () => {},
onMessageComplete: () => {},
onError: (msg) => receivedErrors.push(msg),
onToolUse: () => {},
};

const orchestrator = new RunOrchestrator({
getOrCreateSession: async () => session,
resolveAttachments: () => [],
deriveDefaultStrictSideEffects: () => false,
});

await orchestrator.startRun(conversation.id, 'Hello', undefined, {
eventSink: sink,
});
await new Promise((r) => setTimeout(r, 50));

// The exception message should be forwarded to the event sink
expect(receivedErrors).toEqual(['Unexpected agent crash']);
});

test('no events forwarded when eventSink is not provided', async () => {
const conversation = createConversation('no sink test');
const deltaMsg: ServerMessage = {
Expand Down
6 changes: 6 additions & 0 deletions assistant/src/calls/call-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ export class CallController {
if (this.durationWarningTimer) clearTimeout(this.durationWarningTimer);
if (this.consultationTimer) clearTimeout(this.consultationTimer);
if (this.durationEndTimer) { clearTimeout(this.durationEndTimer); this.durationEndTimer = null; }
this.llmRunVersion++;
this.abortCurrentTurn();
unregisterCallController(this.callSessionId);
log.info({ callSessionId: this.callSessionId }, 'CallController destroyed');
Expand Down Expand Up @@ -398,6 +399,11 @@ export class CallController {
}).catch((err) => {
reject(err);
});

// Defensive: if the turn is aborted (e.g. barge-in) and the event
// sink callbacks are never invoked, resolve the promise so it
// doesn't hang forever.
runSignal.addEventListener('abort', () => { resolve(); }, { once: true });
Comment thread
noanflaherty marked this conversation as resolved.
Comment thread
noanflaherty marked this conversation as resolved.
});

await turnComplete;
Expand Down
9 changes: 9 additions & 0 deletions assistant/src/runtime/run-orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,10 @@ export class RunOrchestrator {
eventSink.onTextDelta(msg.text);
} else if (msg.type === 'message_complete') {
eventSink.onMessageComplete();
} else if (msg.type === 'generation_cancelled') {
// Treat cancellation as a completed turn so the voice
// turnComplete promise settles instead of hanging forever.
eventSink.onMessageComplete();
} else if (msg.type === 'error') {
eventSink.onError(msg.message);
} else if (msg.type === 'session_error') {
Expand All @@ -331,6 +335,11 @@ export class RunOrchestrator {
const message = err instanceof Error ? err.message : String(err);
log.error({ err, runId: run.id }, 'Run failed');
runsStore.failRun(run.id, message);
// Notify the voice event sink so the caller's turnComplete
// promise settles instead of hanging on unhandled exceptions.
if (eventSink) {
eventSink.onError(message);
}
} finally {
cleanup();
}
Expand Down