diff --git a/assistant/src/__tests__/call-controller.test.ts b/assistant/src/__tests__/call-controller.test.ts index d1f3f760d50..9022b1b3e60 100644 --- a/assistant/src/__tests__/call-controller.test.ts +++ b/assistant/src/__tests__/call-controller.test.ts @@ -96,7 +96,7 @@ function createMockVoiceTurn(tokens: string[]) { } return { - runId: `run-${Date.now()}`, + turnId: `run-${Date.now()}`, abort: () => {}, }; }; @@ -109,7 +109,7 @@ mock.module('../calls/voice-session-bridge.js', () => { mockStartVoiceTurn = mock(createMockVoiceTurn(['Hello', ' there'])); return { startVoiceTurn: (...args: unknown[]) => mockStartVoiceTurn(...args), - setVoiceBridgeOrchestrator: () => {}, + setVoiceBridgeDeps: () => {}, }; }); @@ -266,7 +266,7 @@ describe('call-controller', () => { expect(opts.content).toContain('Can you summarize this meeting?'); opts.onTextDelta('Sure, here is a summary.'); opts.onComplete(); - return { runId: 'run-1', abort: () => {} }; + return { turnId: 'run-1', abort: () => {} }; }); const { controller } = setupController(); @@ -291,7 +291,7 @@ describe('call-controller', () => { opts.onTextDelta(token); } opts.onComplete(); - return { runId: 'run-1', abort: () => {} }; + return { turnId: 'run-1', abort: () => {} }; }); const { relay, controller } = setupController('Confirm appointment'); @@ -331,7 +331,7 @@ describe('call-controller', () => { opts.onTextDelta(token); } opts.onComplete(); - return { runId: `run-${turnCount}`, abort: () => {} }; + return { turnId: `run-${turnCount}`, abort: () => {} }; }); const { controller } = setupController('Tell a joke immediately'); @@ -441,7 +441,7 @@ describe('call-controller', () => { opts.onTextDelta(token); } opts.onComplete(); - return { runId: 'run-2', abort: () => {} }; + return { turnId: 'run-2', abort: () => {} }; }); const accepted = await controller.handleUserAnswer('3pm tomorrow'); @@ -507,7 +507,7 @@ describe('call-controller', () => { test('Voice turn error: sends error message to caller and returns to idle', async () => { mockStartVoiceTurn.mockImplementation(async (opts: { onError: (msg: string) => void }) => { opts.onError('API rate limit exceeded'); - return { runId: 'run-err', abort: () => {} }; + return { turnId: 'run-err', abort: () => {} }; }); const { relay, controller } = setupController(); @@ -554,7 +554,7 @@ describe('call-controller', () => { const timeout = setTimeout(() => { opts.onTextDelta('This should be interrupted'); opts.onComplete(); - resolve({ runId: 'run-1', abort: () => {} }); + resolve({ turnId: 'run-1', abort: () => {} }); }, 1000); opts.signal?.addEventListener('abort', () => { @@ -563,7 +563,7 @@ describe('call-controller', () => { // onComplete via the event sink. The AbortSignal listener // in call-controller also resolves turnComplete defensively. opts.onComplete(); - resolve({ runId: 'run-1', abort: () => {} }); + resolve({ turnId: 'run-1', abort: () => {} }); }, { once: true }); }); }); @@ -588,14 +588,14 @@ describe('call-controller', () => { const timeout = setTimeout(() => { opts.onTextDelta('Long running turn'); opts.onComplete(); - resolve({ runId: 'run-1', abort: () => {} }); + resolve({ turnId: 'run-1', abort: () => {} }); }, 5000); opts.signal?.addEventListener('abort', () => { clearTimeout(timeout); // Intentionally do NOT call onComplete — simulates the old // broken path where generation_cancelled was not forwarded. - resolve({ runId: 'run-1', abort: () => {} }); + resolve({ turnId: 'run-1', abort: () => {} }); }, { once: true }); }); }); @@ -633,7 +633,7 @@ describe('call-controller', () => { capturedGuardianContext = opts.guardianContext; opts.onTextDelta('Hello.'); opts.onComplete(); - return { runId: 'run-gc', abort: () => {} }; + return { turnId: 'run-gc', abort: () => {} }; }); const { controller } = setupController(undefined, { guardianContext: guardianCtx }); @@ -655,7 +655,7 @@ describe('call-controller', () => { capturedAssistantId = opts.assistantId; opts.onTextDelta('Hello.'); opts.onComplete(); - return { runId: 'run-aid', abort: () => {} }; + return { turnId: 'run-aid', abort: () => {} }; }); const { controller } = setupController(undefined, { assistantId: 'my-assistant' }); @@ -690,7 +690,7 @@ describe('call-controller', () => { capturedContexts.push(opts.guardianContext); opts.onTextDelta('Response.'); opts.onComplete(); - return { runId: `run-${capturedContexts.length}`, abort: () => {} }; + return { turnId: `run-${capturedContexts.length}`, abort: () => {} }; }); const { controller } = setupController(undefined, { guardianContext: initialCtx }); @@ -738,14 +738,14 @@ describe('call-controller', () => { const timeout = setTimeout(() => { opts.onTextDelta('This is a long response'); opts.onComplete(); - resolve({ runId: 'run-1', abort: () => {} }); + resolve({ turnId: 'run-1', abort: () => {} }); }, 1000); opts.signal?.addEventListener('abort', () => { clearTimeout(timeout); // The defensive abort listener in runTurn resolves turnComplete opts.onComplete(); - resolve({ runId: 'run-1', abort: () => {} }); + resolve({ turnId: 'run-1', abort: () => {} }); }, { once: true }); }); }); @@ -783,7 +783,7 @@ describe('call-controller', () => { opts.onTextDelta(token); } opts.onComplete(); - return { runId: 'run-instr', abort: () => {} }; + return { turnId: 'run-instr', abort: () => {} }; }); const { relay, controller } = setupController(); @@ -831,7 +831,7 @@ describe('call-controller', () => { turnCallCount++; opts.onTextDelta('Should not appear.'); opts.onComplete(); - return { runId: 'run-blocked', abort: () => {} }; + return { turnId: 'run-blocked', abort: () => {} }; }); // Caller speaks while waiting — should be queued, not processed @@ -864,7 +864,7 @@ describe('call-controller', () => { opts.onTextDelta('Got it, 4pm.'); } opts.onComplete(); - return { runId: `run-${turnContents.length}`, abort: () => {} }; + return { turnId: `run-${turnContents.length}`, abort: () => {} }; }); const accepted = await controller.handleUserAnswer('Yes, confirmed'); @@ -897,7 +897,7 @@ describe('call-controller', () => { // Simulate the model trying to emit another ASK_GUARDIAN opts.onTextDelta('[ASK_GUARDIAN: Preferred date again?]'); opts.onComplete(); - return { runId: 'run-dup', abort: () => {} }; + return { turnId: 'run-dup', abort: () => {} }; }); // Multiple caller utterances during waiting_on_user — all should be queued @@ -924,7 +924,7 @@ describe('call-controller', () => { await new Promise((r) => setTimeout(r, 200)); opts.onTextDelta('Response.'); opts.onComplete(); - return { runId: 'run-proc', abort: () => {} }; + return { turnId: 'run-proc', abort: () => {} }; }); const turnPromise = controller.handleCallerUtterance('Test'); // Give it a moment to enter processing state @@ -952,7 +952,7 @@ describe('call-controller', () => { turnCallCount++; opts.onTextDelta('Response after instruction.'); opts.onComplete(); - return { runId: 'run-2', abort: () => {} }; + return { turnId: 'run-2', abort: () => {} }; }); // Inject instruction while in waiting_on_user state @@ -989,7 +989,7 @@ describe('call-controller', () => { turnContents.push(opts.content); opts.onTextDelta('Alright, your appointment is cancelled. Goodbye! [END_CALL]'); opts.onComplete(); - return { runId: `run-${turnContents.length}`, abort: () => {} }; + return { turnId: `run-${turnContents.length}`, abort: () => {} }; }); const accepted = await controller.handleUserAnswer('Yes, cancel it'); @@ -1035,7 +1035,7 @@ describe('call-controller', () => { turnContents.push(opts.content); opts.onTextDelta('Got it, let me check 10am availability.'); opts.onComplete(); - return { runId: `run-${turnContents.length}`, abort: () => {} }; + return { turnId: `run-${turnContents.length}`, abort: () => {} }; }); // Wait for the short consultation timeout to fire diff --git a/assistant/src/__tests__/channel-approval.test.ts b/assistant/src/__tests__/channel-approval.test.ts index 2cdb796cd32..f559a9985de 100644 --- a/assistant/src/__tests__/channel-approval.test.ts +++ b/assistant/src/__tests__/channel-approval.test.ts @@ -1,100 +1,9 @@ -import { mkdtempSync, rmSync } from 'node:fs'; -import { tmpdir } from 'node:os'; -import { join } from 'node:path'; +import { describe, expect, test } from 'bun:test'; -import { afterAll, beforeEach, describe, expect, mock,test } from 'bun:test'; - -// --------------------------------------------------------------------------- -// Test isolation: in-memory SQLite via temp directory -// --------------------------------------------------------------------------- - -const testDir = mkdtempSync(join(tmpdir(), 'channel-approval-test-')); - -mock.module('../util/platform.js', () => ({ - getDataDir: () => testDir, - isMacOS: () => process.platform === 'darwin', - isLinux: () => process.platform === 'linux', - isWindows: () => process.platform === 'win32', - getSocketPath: () => join(testDir, 'test.sock'), - getPidPath: () => join(testDir, 'test.pid'), - getDbPath: () => join(testDir, 'test.db'), - getLogPath: () => join(testDir, 'test.log'), - ensureDataDir: () => {}, -})); - -mock.module('../util/logger.js', () => ({ - getLogger: () => new Proxy({} as Record, { - get: () => () => {}, - }), -})); - -import { initializeDb, resetDb } from '../memory/db.js'; -import type { PendingConfirmation } from '../memory/runs-store.js'; -import { - createRun, - getPendingConfirmationsByConversation, - setRunConfirmation, -} from '../memory/runs-store.js'; import { parseApprovalDecision } from '../runtime/channel-approval-parser.js'; -initializeDb(); - -afterAll(() => { - resetDb(); - try { rmSync(testDir, { recursive: true }); } catch { /* best effort */ } -}); - -// --------------------------------------------------------------------------- -// Helper: insert a conversation so FK constraints pass -// --------------------------------------------------------------------------- - -function ensureConversation(conversationId: string): void { - // eslint-disable-next-line @typescript-eslint/no-require-imports - const { getDb } = require('../memory/db.js'); - const db = getDb(); - // eslint-disable-next-line @typescript-eslint/no-require-imports - const { conversations } = require('../memory/schema.js'); - try { - db.insert(conversations).values({ - id: conversationId, - createdAt: Date.now(), - updatedAt: Date.now(), - }).run(); - } catch { - // already exists - } -} - -function ensureMessage(messageId: string, conversationId: string): void { - // eslint-disable-next-line @typescript-eslint/no-require-imports - const { getDb } = require('../memory/db.js'); - const db = getDb(); - // eslint-disable-next-line @typescript-eslint/no-require-imports - const { messages } = require('../memory/schema.js'); - try { - db.insert(messages).values({ - id: messageId, - conversationId, - role: 'user', - content: 'test', - createdAt: Date.now(), - }).run(); - } catch { - // already exists - } -} - -function resetTables(): void { - // eslint-disable-next-line @typescript-eslint/no-require-imports - const { getDb } = require('../memory/db.js'); - const db = getDb(); - db.run('DELETE FROM message_runs'); - db.run('DELETE FROM messages'); - db.run('DELETE FROM conversations'); -} - // ═══════════════════════════════════════════════════════════════════════════ -// 1. Plain-text approval decision parser +// Plain-text approval decision parser // ═══════════════════════════════════════════════════════════════════════════ describe('parseApprovalDecision', () => { @@ -234,106 +143,3 @@ describe('parseApprovalDecision', () => { expect(result!.requestId).toBeUndefined(); }); }); - -// ═══════════════════════════════════════════════════════════════════════════ -// 2. Pending-run lookup helpers -// ═══════════════════════════════════════════════════════════════════════════ - -describe('getPendingConfirmationsByConversation', () => { - beforeEach(() => { - resetTables(); - }); - - const sampleConfirmation: PendingConfirmation = { - toolName: 'shell', - toolUseId: 'req-abc-123', - input: { command: 'rm -rf /tmp/test' }, - riskLevel: 'high', - }; - - test('returns empty array when no runs exist', () => { - ensureConversation('conv-1'); - const result = getPendingConfirmationsByConversation('conv-1'); - expect(result).toEqual([]); - }); - - test('returns empty array when no runs need confirmation', () => { - ensureConversation('conv-1'); - ensureMessage('msg-1', 'conv-1'); - createRun('conv-1', 'msg-1'); - const result = getPendingConfirmationsByConversation('conv-1'); - expect(result).toEqual([]); - }); - - test('returns pending confirmation for a run needing confirmation', () => { - ensureConversation('conv-1'); - ensureMessage('msg-1', 'conv-1'); - const run = createRun('conv-1', 'msg-1'); - setRunConfirmation(run.id, sampleConfirmation); - - const result = getPendingConfirmationsByConversation('conv-1'); - expect(result).toHaveLength(1); - expect(result[0].runId).toBe(run.id); - expect(result[0].requestId).toBe('req-abc-123'); - expect(result[0].toolName).toBe('shell'); - expect(result[0].input).toEqual({ command: 'rm -rf /tmp/test' }); - expect(result[0].riskLevel).toBe('high'); - }); - - test('only returns runs for the specified conversation', () => { - ensureConversation('conv-1'); - ensureConversation('conv-2'); - ensureMessage('msg-1', 'conv-1'); - ensureMessage('msg-2', 'conv-2'); - - const run1 = createRun('conv-1', 'msg-1'); - const run2 = createRun('conv-2', 'msg-2'); - setRunConfirmation(run1.id, sampleConfirmation); - setRunConfirmation(run2.id, { ...sampleConfirmation, toolUseId: 'req-def-456' }); - - const result1 = getPendingConfirmationsByConversation('conv-1'); - expect(result1).toHaveLength(1); - expect(result1[0].runId).toBe(run1.id); - - const result2 = getPendingConfirmationsByConversation('conv-2'); - expect(result2).toHaveLength(1); - expect(result2[0].runId).toBe(run2.id); - }); - - test('returns multiple pending runs for the same conversation', () => { - ensureConversation('conv-1'); - ensureMessage('msg-1', 'conv-1'); - ensureMessage('msg-2', 'conv-1'); - - const run1 = createRun('conv-1', 'msg-1'); - const run2 = createRun('conv-1', 'msg-2'); - setRunConfirmation(run1.id, sampleConfirmation); - setRunConfirmation(run2.id, { ...sampleConfirmation, toolUseId: 'req-ghi-789', toolName: 'file_edit' }); - - const result = getPendingConfirmationsByConversation('conv-1'); - expect(result).toHaveLength(2); - - const runIds = result.map((r) => r.runId).sort(); - expect(runIds).toContain(run1.id); - expect(runIds).toContain(run2.id); - }); - - test('excludes completed and failed runs', () => { - ensureConversation('conv-1'); - ensureMessage('msg-1', 'conv-1'); - ensureMessage('msg-2', 'conv-1'); - ensureMessage('msg-3', 'conv-1'); - - const run1 = createRun('conv-1', 'msg-1'); - const _run2 = createRun('conv-1', 'msg-2'); - const _run3 = createRun('conv-1', 'msg-3'); - - setRunConfirmation(run1.id, sampleConfirmation); - // run2 stays in 'running' state - // run3 gets confirmation then completes — simulated by not setting confirmation - - const result = getPendingConfirmationsByConversation('conv-1'); - expect(result).toHaveLength(1); - expect(result[0].runId).toBe(run1.id); - }); -}); diff --git a/assistant/src/__tests__/run-orchestrator-assistant-events.test.ts b/assistant/src/__tests__/run-orchestrator-assistant-events.test.ts deleted file mode 100644 index cdf681db721..00000000000 --- a/assistant/src/__tests__/run-orchestrator-assistant-events.test.ts +++ /dev/null @@ -1,235 +0,0 @@ -/** - * Tests that HTTP-triggered run/session flows mirror messages into the - * assistant-events hub with payload parity to IPC outbound messages. - * - * The Session class has two distinct outbound paths: - * 1. updateClient handler — used by the prompter for confirmation_request, - * trace emitter, secret prompter. - * 2. runAgentLoop onEvent callback — used for the primary streaming events: - * assistant_text_delta, message_complete, tool_use_start, tool_result, etc. - * - * Both paths must publish to the hub. - * - * Tests: - * - confirmation_request (updateClient path) → hub emits one AssistantEvent - * - assistant_text_delta + message_complete (onEvent path) → hub emits in order - * - sessionId falls back to conversationId when the message lacks it - */ -import { mkdtempSync } from 'node:fs'; -import { tmpdir } from 'node:os'; -import { join } from 'node:path'; - -import { afterAll, beforeEach, describe, expect, mock,test } from 'bun:test'; - -import type { ServerMessage } from '../daemon/ipc-protocol.js'; -import type { Session } from '../daemon/session.js'; -import type { AssistantEvent } from '../runtime/assistant-event.js'; - -const testDir = mkdtempSync(join(tmpdir(), 'run-orch-hub-test-')); - -mock.module('../util/platform.js', () => ({ - getRootDir: () => testDir, - getDataDir: () => testDir, - isMacOS: () => process.platform === 'darwin', - isLinux: () => process.platform === 'linux', - isWindows: () => process.platform === 'win32', - getSocketPath: () => join(testDir, 'test.sock'), - getPidPath: () => join(testDir, 'test.pid'), - getDbPath: () => join(testDir, 'test.db'), - getLogPath: () => join(testDir, 'test.log'), - ensureDataDir: () => {}, -})); - -mock.module('../util/logger.js', () => ({ - getLogger: () => new Proxy({} as Record, { - get: () => () => {}, - }), -})); - -import { createConversation } from '../memory/conversation-store.js'; -import { getDb, initializeDb, resetDb } from '../memory/db.js'; -import { assistantEventHub } from '../runtime/assistant-event-hub.js'; -import { RunOrchestrator } from '../runtime/run-orchestrator.js'; - -initializeDb(); - -afterAll(() => { - resetDb(); -}); - -// ── Helpers ────────────────────────────────────────────────────────────────── - -/** - * Build a session that calls the updateClient handler with the given messages - * (simulates prompter / confirmation path). - */ -function makeSessionEmittingViaClient(...messages: ServerMessage[]): Session { - let clientHandler: (msg: ServerMessage) => void = () => {}; - return { - isProcessing: () => false, - persistUserMessage: () => undefined as unknown as string, - memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false }, - setChannelCapabilities: () => {}, - setAssistantId: () => {}, - setGuardianContext: () => {}, - setCommandIntent: () => {}, - updateClient: (handler: (msg: ServerMessage) => void) => { - clientHandler = handler; - }, - runAgentLoop: async () => { - for (const msg of messages) { - clientHandler(msg); - } - }, - handleConfirmationResponse: () => {}, - } as unknown as Session; -} - -/** - * Build a session that calls the onEvent callback with the given messages - * (simulates the primary agent-loop streaming path). - */ -function makeSessionEmittingViaAgentLoop(...messages: ServerMessage[]): Session { - return { - isProcessing: () => false, - persistUserMessage: () => undefined as unknown as string, - memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false }, - setChannelCapabilities: () => {}, - setAssistantId: () => {}, - setGuardianContext: () => {}, - setCommandIntent: () => {}, - updateClient: () => {}, - runAgentLoop: async (_content: string, _messageId: string, onEvent: (msg: ServerMessage) => void) => { - for (const msg of messages) { - onEvent(msg); - } - }, - handleConfirmationResponse: () => {}, - } as unknown as Session; -} - -// ── Tests ───────────────────────────────────────────────────────────────────── - -describe('HTTP run → confirmation_request mirrors to assistant-events hub', () => { - beforeEach(() => { - const db = getDb(); - db.run('DELETE FROM message_runs'); - db.run('DELETE FROM messages'); - db.run('DELETE FROM conversations'); - }); - - test('confirmation_request (updateClient path) emits one AssistantEvent', async () => { - const conversation = createConversation('http-confirmation-test'); - const confirmationMsg: ServerMessage = { - type: 'confirmation_request', - requestId: 'req-http-1', - toolName: 'bash', - input: { command: 'ls' }, - riskLevel: 'medium', - allowlistOptions: [{ label: 'ls', description: 'List files', pattern: 'ls' }], - scopeOptions: [{ label: 'everywhere', scope: 'everywhere' }], - }; - const session = makeSessionEmittingViaClient(confirmationMsg); - - const received: AssistantEvent[] = []; - const sub = assistantEventHub.subscribe( - { assistantId: 'self' }, - (e) => { received.push(e); }, - ); - - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - await orchestrator.startRun(conversation.id, 'Do something'); - // Wait for the async hub chain to flush. - await new Promise((r) => setTimeout(r, 20)); - - sub.dispose(); - - expect(received).toHaveLength(1); - expect(received[0].assistantId).toBe('self'); - expect(received[0].sessionId).toBe(conversation.id); - expect(received[0].message.type).toBe('confirmation_request'); - expect(received[0].message).toBe(confirmationMsg); - }); -}); - -describe('HTTP run → message flow mirrors to assistant-events hub', () => { - beforeEach(() => { - const db = getDb(); - db.run('DELETE FROM message_runs'); - db.run('DELETE FROM messages'); - db.run('DELETE FROM conversations'); - }); - - test('assistant_text_delta and message_complete (onEvent path) emit in order', async () => { - const conversation = createConversation('http-message-flow-test'); - const deltaMsg: ServerMessage = { - type: 'assistant_text_delta', - sessionId: conversation.id, - text: 'Working on it...', - }; - const completeMsg: ServerMessage = { - type: 'message_complete', - sessionId: conversation.id, - }; - const session = makeSessionEmittingViaAgentLoop(deltaMsg, completeMsg); - - const received: AssistantEvent[] = []; - const sub = assistantEventHub.subscribe( - { assistantId: 'self' }, - (e) => { received.push(e); }, - ); - - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - await orchestrator.startRun(conversation.id, 'Hello'); - await new Promise((r) => setTimeout(r, 20)); - - sub.dispose(); - - expect(received).toHaveLength(2); - expect(received[0].message.type).toBe('assistant_text_delta'); - expect(received[1].message.type).toBe('message_complete'); - // Both should carry the session id - expect(received[0].sessionId).toBe(conversation.id); - expect(received[1].sessionId).toBe(conversation.id); - // Messages are the unmodified originals - expect(received[0].message).toBe(deltaMsg); - expect(received[1].message).toBe(completeMsg); - }); - - test('sessionId falls back to conversationId when message lacks it (onEvent path)', async () => { - const conversation = createConversation('http-session-fallback-test'); - // pong has no sessionId field - const msg: ServerMessage = { type: 'pong' }; - const session = makeSessionEmittingViaAgentLoop(msg); - - const received: AssistantEvent[] = []; - const sub = assistantEventHub.subscribe( - { assistantId: 'self' }, - (e) => { received.push(e); }, - ); - - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - await orchestrator.startRun(conversation.id, 'ping'); - await new Promise((r) => setTimeout(r, 20)); - - sub.dispose(); - - expect(received).toHaveLength(1); - expect(received[0].sessionId).toBe(conversation.id); - }); -}); diff --git a/assistant/src/__tests__/run-orchestrator.test.ts b/assistant/src/__tests__/run-orchestrator.test.ts deleted file mode 100644 index 3d5f9eee0cf..00000000000 --- a/assistant/src/__tests__/run-orchestrator.test.ts +++ /dev/null @@ -1,861 +0,0 @@ -import { mkdtempSync, rmSync } from 'node:fs'; -import { tmpdir } from 'node:os'; -import { join } from 'node:path'; - -import { afterAll, beforeEach, describe, expect, mock,test } from 'bun:test'; - -import type { ServerMessage } from '../daemon/ipc-protocol.js'; -import type { Session } from '../daemon/session.js'; - -const testDir = mkdtempSync(join(tmpdir(), 'run-orchestrator-test-')); - -mock.module('../util/platform.js', () => ({ - getRootDir: () => testDir, - getDataDir: () => testDir, - isMacOS: () => process.platform === 'darwin', - isLinux: () => process.platform === 'linux', - isWindows: () => process.platform === 'win32', - getSocketPath: () => join(testDir, 'test.sock'), - getPidPath: () => join(testDir, 'test.pid'), - getDbPath: () => join(testDir, 'test.db'), - getLogPath: () => join(testDir, 'test.log'), - ensureDataDir: () => {}, -})); - -mock.module('../util/logger.js', () => ({ - getLogger: () => new Proxy({} as Record, { - get: () => () => {}, - }), -})); - -mock.module('../config/loader.js', () => ({ - getConfig: () => ({ - secretDetection: { enabled: false }, - }), -})); - -import type { ChannelCapabilities } from '../daemon/session-runtime-assembly.js'; -import { createConversation } from '../memory/conversation-store.js'; -import { getDb, initializeDb, resetDb } from '../memory/db.js'; -import { createRun, getRun, setRunConfirmation } from '../memory/runs-store.js'; -import type { VoiceRunEventSink } from '../runtime/run-orchestrator.js'; -import { RunOrchestrator } from '../runtime/run-orchestrator.js'; - -initializeDb(); - -function makeSessionWithConfirmation(message: ServerMessage): Session { - let clientHandler: (msg: ServerMessage) => void = () => {}; - return { - isProcessing: () => false, - // Return undefined so createRun stores messageId as null and avoids - // a foreign-key dependency on the conversation-store message table. - persistUserMessage: () => undefined as unknown as string, - memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false }, - setChannelCapabilities: () => {}, - setAssistantId: () => {}, - setGuardianContext: () => {}, - setCommandIntent: () => {}, - setTurnChannelContext: () => {}, - setVoiceCallControlPrompt: () => {}, - updateClient: (handler: (msg: ServerMessage) => void) => { - clientHandler = handler; - }, - runAgentLoop: async () => { - clientHandler(message); - return await new Promise(() => {}); - }, - handleConfirmationResponse: () => {}, - } as unknown as Session; -} - -/** - * Build a session whose runAgentLoop emits the given message via the onEvent - * callback and then resolves (simulating a completed agent loop). - */ -function makeSessionWithEvent(message: ServerMessage): Session { - return { - isProcessing: () => false, - persistUserMessage: () => undefined as unknown as string, - memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false }, - setChannelCapabilities: () => {}, - setAssistantId: () => {}, - setGuardianContext: () => {}, - setCommandIntent: () => {}, - setTurnChannelContext: () => {}, - setVoiceCallControlPrompt: () => {}, - updateClient: () => {}, - runAgentLoop: async (_content: string, _messageId: string, onEvent: (msg: ServerMessage) => void) => { - onEvent(message); - }, - handleConfirmationResponse: () => {}, - } as unknown as Session; -} - -describe('run failure detection', () => { - beforeEach(() => { - const db = getDb(); - db.run('DELETE FROM message_runs'); - db.run('DELETE FROM messages'); - db.run('DELETE FROM conversations'); - }); - - test('session_error event marks the run as failed', async () => { - const conversation = createConversation('session error test'); - const session = makeSessionWithEvent({ - type: 'session_error', - sessionId: conversation.id, - code: 'PROVIDER_NETWORK', - userMessage: 'Unable to reach the AI provider.', - retryable: true, - }); - - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - const { run } = await orchestrator.startRun(conversation.id, 'Hello'); - - // The agent loop fires asynchronously; give it a tick to settle. - await new Promise((r) => setTimeout(r, 50)); - - const stored = orchestrator.getRun(run.id); - expect(stored?.status).toBe('failed'); - expect(stored?.error).toBe('Unable to reach the AI provider.'); - }); - - test('generic error event still marks the run as failed', async () => { - const conversation = createConversation('generic error test'); - const session = makeSessionWithEvent({ - type: 'error', - message: 'Something went wrong', - }); - - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - const { run } = await orchestrator.startRun(conversation.id, 'Hello'); - - await new Promise((r) => setTimeout(r, 50)); - - const stored = orchestrator.getRun(run.id); - expect(stored?.status).toBe('failed'); - expect(stored?.error).toBe('Something went wrong'); - }); -}); - -afterAll(() => { - resetDb(); - try { rmSync(testDir, { recursive: true, force: true }); } catch { /* best effort */ } -}); - -describe('run approval state executionTarget', () => { - beforeEach(() => { - const db = getDb(); - db.run('DELETE FROM message_runs'); - db.run('DELETE FROM messages'); - db.run('DELETE FROM conversations'); - }); - - test('stores pending confirmation executionTarget when provided', () => { - const conversation = createConversation('run test'); - const run = createRun(conversation.id); - - setRunConfirmation(run.id, { - toolName: 'host_file_read', - toolUseId: 'req-1', - input: { path: '/etc/hosts' }, - riskLevel: 'medium', - executionTarget: 'host', - allowlistOptions: [{ label: '/etc/hosts', pattern: 'host_file_read:/etc/hosts' }], - scopeOptions: [{ label: 'everywhere', scope: 'everywhere' }], - }); - - const stored = getRun(run.id); - expect(stored?.status).toBe('needs_confirmation'); - expect(stored?.pendingConfirmation?.executionTarget).toBe('host'); - }); - - test('parses pending confirmations without executionTarget for legacy rows', () => { - const conversation = createConversation('legacy run test'); - const run = createRun(conversation.id); - - setRunConfirmation(run.id, { - toolName: 'bash', - toolUseId: 'req-legacy', - input: { command: 'ls' }, - riskLevel: 'medium', - allowlistOptions: [{ label: 'ls', pattern: 'ls' }], - scopeOptions: [{ label: '/tmp', scope: '/tmp' }], - }); - - const stored = getRun(run.id); - expect(stored?.status).toBe('needs_confirmation'); - expect(stored?.pendingConfirmation?.executionTarget).toBeUndefined(); - }); - - test('run orchestrator persists executionTarget from confirmation_request', async () => { - const conversation = createConversation('orchestrator run test'); - const session = makeSessionWithConfirmation({ - type: 'confirmation_request', - requestId: 'req-2', - toolName: 'host_bash', - input: { command: 'pwd' }, - riskLevel: 'medium', - executionTarget: 'host', - allowlistOptions: [{ label: 'pwd', description: 'This exact command', pattern: 'pwd' }], - scopeOptions: [{ label: 'everywhere', scope: 'everywhere' }], - }); - - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - const { run } = await orchestrator.startRun(conversation.id, 'Run host command'); - const stored = orchestrator.getRun(run.id); - expect(stored?.status).toBe('needs_confirmation'); - expect(stored?.pendingConfirmation?.executionTarget).toBe('host'); - }); -}); - -// ═══════════════════════════════════════════════════════════════════════════ -// Channel capability resolution via sourceChannel (WS-D) -// ═══════════════════════════════════════════════════════════════════════════ - -describe('startRun channel capability resolution', () => { - beforeEach(() => { - const db = getDb(); - db.run('DELETE FROM message_runs'); - db.run('DELETE FROM messages'); - db.run('DELETE FROM conversations'); - }); - - test('resolves channel capabilities from provided sourceChannel', async () => { - const conversation = createConversation('telegram channel test'); - let capturedCapabilities: ChannelCapabilities | null = null; - - const session = { - isProcessing: () => false, - persistUserMessage: () => undefined as unknown as string, - memoryPolicy: {}, - setChannelCapabilities: (caps: ChannelCapabilities | null) => { - if (caps) capturedCapabilities = caps; - }, - setAssistantId: () => {}, - setGuardianContext: () => {}, - setCommandIntent: () => {}, - setTurnChannelContext: () => {}, - setVoiceCallControlPrompt: () => {}, - updateClient: () => {}, - runAgentLoop: async () => {}, - handleConfirmationResponse: () => {}, - } as unknown as Session; - - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - await orchestrator.startRun(conversation.id, 'Hello from Telegram', undefined, { - sourceChannel: 'telegram', - }); - - // Wait for the async agent loop to settle - await new Promise((r) => setTimeout(r, 50)); - - expect(capturedCapabilities).not.toBeNull(); - expect(capturedCapabilities!.channel).toBe('telegram'); - expect(capturedCapabilities!.dashboardCapable).toBe(false); - }); - - test('defaults to macos (from http-api fallback) when no sourceChannel is provided', async () => { - const conversation = createConversation('http-api default test'); - let capturedCapabilities: ChannelCapabilities | null = null; - - const session = { - isProcessing: () => false, - persistUserMessage: () => undefined as unknown as string, - memoryPolicy: {}, - setChannelCapabilities: (caps: ChannelCapabilities | null) => { - if (caps) capturedCapabilities = caps; - }, - setAssistantId: () => {}, - setGuardianContext: () => {}, - setCommandIntent: () => {}, - setTurnChannelContext: () => {}, - setVoiceCallControlPrompt: () => {}, - updateClient: () => {}, - runAgentLoop: async () => {}, - handleConfirmationResponse: () => {}, - } as unknown as Session; - - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - await orchestrator.startRun(conversation.id, 'Hello from HTTP'); - - await new Promise((r) => setTimeout(r, 50)); - - expect(capturedCapabilities).not.toBeNull(); - expect(capturedCapabilities!.channel).toBe('vellum'); - }); - - test('defaults to vellum when options are provided without sourceChannel', async () => { - const conversation = createConversation('options no channel test'); - let capturedCapabilities: ChannelCapabilities | null = null; - - const session = { - isProcessing: () => false, - persistUserMessage: () => undefined as unknown as string, - memoryPolicy: {}, - setChannelCapabilities: (caps: ChannelCapabilities | null) => { - if (caps) capturedCapabilities = caps; - }, - setAssistantId: () => {}, - setGuardianContext: () => {}, - setCommandIntent: () => {}, - setTurnChannelContext: () => {}, - setVoiceCallControlPrompt: () => {}, - updateClient: () => {}, - runAgentLoop: async () => {}, - handleConfirmationResponse: () => {}, - } as unknown as Session; - - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - await orchestrator.startRun(conversation.id, 'Hello with options', undefined, { - forceStrictSideEffects: true, - }); - - await new Promise((r) => setTimeout(r, 50)); - - expect(capturedCapabilities).not.toBeNull(); - expect(capturedCapabilities!.channel).toBe('vellum'); - }); -}); - -// ═══════════════════════════════════════════════════════════════════════════ -// strictSideEffects re-derivation prevents stale flag across runs -// ═══════════════════════════════════════════════════════════════════════════ - -describe('strictSideEffects re-derivation across runs', () => { - beforeEach(() => { - const db = getDb(); - db.run('DELETE FROM message_runs'); - db.run('DELETE FROM messages'); - db.run('DELETE FROM conversations'); - }); - - test('forceStrictSideEffects=true does not persist to subsequent run without override', async () => { - const conversation = createConversation('stale strict test'); - - // Shared session simulating a cached session reused across runs - const session = { - isProcessing: () => false, - persistUserMessage: () => undefined as unknown as string, - memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false }, - setChannelCapabilities: () => {}, - setAssistantId: () => {}, - setGuardianContext: () => {}, - setCommandIntent: () => {}, - setTurnChannelContext: () => {}, - setVoiceCallControlPrompt: () => {}, - updateClient: () => {}, - runAgentLoop: async () => {}, - handleConfirmationResponse: () => {}, - } as unknown as Session; - - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - // First run: force strict mode on - await orchestrator.startRun(conversation.id, 'non-guardian message', undefined, { - forceStrictSideEffects: true, - }); - await new Promise((r) => setTimeout(r, 50)); - - expect((session as unknown as { memoryPolicy: { strictSideEffects: boolean } }).memoryPolicy.strictSideEffects).toBe(true); - - // Second run: no override — should reset to derived default (false) - await orchestrator.startRun(conversation.id, 'guardian message'); - await new Promise((r) => setTimeout(r, 50)); - - expect((session as unknown as { memoryPolicy: { strictSideEffects: boolean } }).memoryPolicy.strictSideEffects).toBe(false); - }); - - test('private thread re-derives strictSideEffects=true when no override', async () => { - const conversation = createConversation('private thread strict test'); - - const session = { - isProcessing: () => false, - persistUserMessage: () => undefined as unknown as string, - memoryPolicy: { scopeId: 'private-scope', includeDefaultFallback: true, strictSideEffects: true }, - setChannelCapabilities: () => {}, - setAssistantId: () => {}, - setGuardianContext: () => {}, - setCommandIntent: () => {}, - setTurnChannelContext: () => {}, - setVoiceCallControlPrompt: () => {}, - updateClient: () => {}, - runAgentLoop: async () => {}, - handleConfirmationResponse: () => {}, - } as unknown as Session; - - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - // Simulate private thread → default is true - deriveDefaultStrictSideEffects: () => true, - }); - - // Run with explicit false override - await orchestrator.startRun(conversation.id, 'override to false', undefined, { - forceStrictSideEffects: false, - }); - await new Promise((r) => setTimeout(r, 50)); - - expect((session as unknown as { memoryPolicy: { strictSideEffects: boolean } }).memoryPolicy.strictSideEffects).toBe(false); - - // Run without override — should re-derive to true (private thread) - await orchestrator.startRun(conversation.id, 'no override'); - await new Promise((r) => setTimeout(r, 50)); - - expect((session as unknown as { memoryPolicy: { strictSideEffects: boolean } }).memoryPolicy.strictSideEffects).toBe(true); - }); - - test('explicit forceStrictSideEffects=false sets strict to false', async () => { - const conversation = createConversation('explicit false test'); - - const session = { - isProcessing: () => false, - persistUserMessage: () => undefined as unknown as string, - memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: true }, - setChannelCapabilities: () => {}, - setAssistantId: () => {}, - setGuardianContext: () => {}, - setCommandIntent: () => {}, - setTurnChannelContext: () => {}, - setVoiceCallControlPrompt: () => {}, - updateClient: () => {}, - runAgentLoop: async () => {}, - handleConfirmationResponse: () => {}, - } as unknown as Session; - - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => true, - }); - - await orchestrator.startRun(conversation.id, 'force off', undefined, { - forceStrictSideEffects: false, - }); - await new Promise((r) => setTimeout(r, 50)); - - expect((session as unknown as { memoryPolicy: { strictSideEffects: boolean } }).memoryPolicy.strictSideEffects).toBe(false); - }); -}); - -// ═══════════════════════════════════════════════════════════════════════════ -// VoiceRunEventSink forwarding -// ═══════════════════════════════════════════════════════════════════════════ - -describe('eventSink forwarding', () => { - beforeEach(() => { - const db = getDb(); - db.run('DELETE FROM message_runs'); - db.run('DELETE FROM messages'); - db.run('DELETE FROM conversations'); - }); - - test('eventSink receives assistant_text_delta events', async () => { - const conversation = createConversation('event sink delta test'); - const deltaMsg: ServerMessage = { - type: 'assistant_text_delta', - text: 'Hello from agent', - sessionId: conversation.id, - }; - const session = makeSessionWithEvent(deltaMsg); - - const receivedDeltas: string[] = []; - const sink: VoiceRunEventSink = { - onTextDelta: (text) => receivedDeltas.push(text), - onMessageComplete: () => {}, - onError: () => {}, - onToolUse: () => {}, - }; - - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - await orchestrator.startRun(conversation.id, 'Hello', undefined, { - eventSink: sink, - }); - await new Promise((r) => setTimeout(r, 50)); - - expect(receivedDeltas).toEqual(['Hello from agent']); - }); - - test('eventSink receives error events', async () => { - const conversation = createConversation('event sink error test'); - const errMsg: ServerMessage = { - type: 'error', - message: 'Something broke', - }; - const session = makeSessionWithEvent(errMsg); - - const receivedErrors: string[] = []; - const sink: VoiceRunEventSink = { - onTextDelta: () => {}, - onMessageComplete: () => {}, - onError: (msg) => receivedErrors.push(msg), - onToolUse: () => {}, - }; - - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - await orchestrator.startRun(conversation.id, 'Hello', undefined, { - eventSink: sink, - }); - await new Promise((r) => setTimeout(r, 50)); - - expect(receivedErrors).toEqual(['Something broke']); - }); - - test('eventSink receives tool_use_start events', async () => { - const conversation = createConversation('event sink tool test'); - const toolMsg: ServerMessage = { - type: 'tool_use_start', - toolName: 'web_search', - input: { query: 'test' }, - sessionId: conversation.id, - }; - const session = makeSessionWithEvent(toolMsg); - - const receivedTools: Array<{ name: string; input: Record }> = []; - const sink: VoiceRunEventSink = { - onTextDelta: () => {}, - onMessageComplete: () => {}, - onError: () => {}, - onToolUse: (name, input) => receivedTools.push({ name, input }), - }; - - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - await orchestrator.startRun(conversation.id, 'Hello', undefined, { - eventSink: sink, - }); - await new Promise((r) => setTimeout(r, 50)); - - expect(receivedTools).toHaveLength(1); - expect(receivedTools[0].name).toBe('web_search'); - expect(receivedTools[0].input).toEqual({ query: 'test' }); - }); - - test('eventSink receives onMessageComplete on generation_cancelled', async () => { - const conversation = createConversation('event sink cancelled test'); - const cancelledMsg: ServerMessage = { - type: 'generation_cancelled', - sessionId: conversation.id, - }; - const session = makeSessionWithEvent(cancelledMsg); - - let messageCompleteCount = 0; - const receivedErrors: string[] = []; - const sink: VoiceRunEventSink = { - onTextDelta: () => {}, - onMessageComplete: () => { messageCompleteCount++; }, - onError: (msg) => receivedErrors.push(msg), - onToolUse: () => {}, - }; - - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - await orchestrator.startRun(conversation.id, 'Hello', undefined, { - eventSink: sink, - }); - await new Promise((r) => setTimeout(r, 50)); - - // generation_cancelled should be forwarded as onMessageComplete - expect(messageCompleteCount).toBe(1); - // It should NOT trigger onError - expect(receivedErrors).toHaveLength(0); - }); - - test('eventSink receives onError when runAgentLoop throws', async () => { - const conversation = createConversation('event sink exception test'); - - // Build a session whose runAgentLoop throws an exception instead of - // emitting events — simulating an unhandled crash in the agent loop. - const session = { - isProcessing: () => false, - persistUserMessage: () => undefined as unknown as string, - memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false }, - setChannelCapabilities: () => {}, - setAssistantId: () => {}, - setGuardianContext: () => {}, - setCommandIntent: () => {}, - setTurnChannelContext: () => {}, - setVoiceCallControlPrompt: () => {}, - updateClient: () => {}, - runAgentLoop: async () => { - throw new Error('Unexpected agent crash'); - }, - handleConfirmationResponse: () => {}, - } as unknown as Session; - - const receivedErrors: string[] = []; - const sink: VoiceRunEventSink = { - onTextDelta: () => {}, - onMessageComplete: () => {}, - onError: (msg) => receivedErrors.push(msg), - onToolUse: () => {}, - }; - - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - await orchestrator.startRun(conversation.id, 'Hello', undefined, { - eventSink: sink, - }); - await new Promise((r) => setTimeout(r, 50)); - - // The exception message should be forwarded to the event sink - expect(receivedErrors).toEqual(['Unexpected agent crash']); - }); - - test('no events forwarded when eventSink is not provided', async () => { - const conversation = createConversation('no sink test'); - const deltaMsg: ServerMessage = { - type: 'assistant_text_delta', - text: 'Hello', - sessionId: conversation.id, - }; - const session = makeSessionWithEvent(deltaMsg); - - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - // Should not throw when no eventSink is provided - const { run } = await orchestrator.startRun(conversation.id, 'Hello'); - await new Promise((r) => setTimeout(r, 50)); - - const stored = orchestrator.getRun(run.id); - expect(stored?.status).toBe('completed'); - }); -}); - -// ═══════════════════════════════════════════════════════════════════════════ -// Run abort / cancellation -// ═══════════════════════════════════════════════════════════════════════════ - -describe('run abort', () => { - beforeEach(() => { - const db = getDb(); - db.run('DELETE FROM message_runs'); - db.run('DELETE FROM messages'); - db.run('DELETE FROM conversations'); - }); - - test('startRun returns an abort function', async () => { - const conversation = createConversation('abort handle test'); - const session = { - isProcessing: () => false, - currentRequestId: undefined as string | undefined, - persistUserMessage: (_c: string, _a: unknown[], reqId: string) => { - session.currentRequestId = reqId; - return undefined as unknown as string; - }, - memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false }, - setChannelCapabilities: () => {}, - setAssistantId: () => {}, - setGuardianContext: () => {}, - setCommandIntent: () => {}, - setTurnChannelContext: () => {}, - setVoiceCallControlPrompt: () => {}, - updateClient: () => {}, - runAgentLoop: async () => {}, - handleConfirmationResponse: () => {}, - abort: () => {}, - } as unknown as Session; - - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - const handle = await orchestrator.startRun(conversation.id, 'Hello'); - expect(typeof handle.abort).toBe('function'); - expect(handle.run.id).toBeDefined(); - }); - - test('aborting a run does not crash session state', async () => { - const conversation = createConversation('abort safety test'); - let abortCalled = false; - - const session = { - isProcessing: () => false, - currentRequestId: undefined as string | undefined, - persistUserMessage: (_c: string, _a: unknown[], reqId: string) => { - session.currentRequestId = reqId; - return undefined as unknown as string; - }, - memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false }, - setChannelCapabilities: () => {}, - setAssistantId: () => {}, - setGuardianContext: () => {}, - setCommandIntent: () => {}, - setTurnChannelContext: () => {}, - setVoiceCallControlPrompt: () => {}, - updateClient: () => {}, - runAgentLoop: async () => { - // Simulate a long-running agent loop - await new Promise((r) => setTimeout(r, 200)); - }, - handleConfirmationResponse: () => {}, - abort: () => { abortCalled = true; }, - } as unknown as Session; - - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - const handle = await orchestrator.startRun(conversation.id, 'Hello'); - - // Abort immediately — session still has same requestId - handle.abort(); - expect(abortCalled).toBe(true); - - // Wait for cleanup to settle - await new Promise((r) => setTimeout(r, 300)); - - // Session state should not be corrupted — the run completes normally - // since the mock runAgentLoop resolves after 200ms regardless. - const stored = orchestrator.getRun(handle.run.id); - expect(stored).not.toBeNull(); - }); - - test('stale abort handle is a no-op when session has moved to a new run', async () => { - const conversation = createConversation('stale abort test'); - let abortCalled = false; - - const session = { - isProcessing: () => false, - currentRequestId: undefined as string | undefined, - persistUserMessage: (_c: string, _a: unknown[], reqId: string) => { - session.currentRequestId = reqId; - return undefined as unknown as string; - }, - memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false }, - setChannelCapabilities: () => {}, - setAssistantId: () => {}, - setGuardianContext: () => {}, - setCommandIntent: () => {}, - setTurnChannelContext: () => {}, - setVoiceCallControlPrompt: () => {}, - updateClient: () => {}, - runAgentLoop: async () => {}, - handleConfirmationResponse: () => {}, - abort: () => { abortCalled = true; }, - } as unknown as Session; - - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - // Start first run and capture its handle - const handle1 = await orchestrator.startRun(conversation.id, 'First turn'); - await new Promise((r) => setTimeout(r, 50)); - - // Start second run — session's currentRequestId now belongs to run 2 - const _handle2 = await orchestrator.startRun(conversation.id, 'Second turn'); - - // Attempt to abort using the stale handle from run 1. - // Since the session has moved to a new requestId, this should be a no-op. - handle1.abort(); - expect(abortCalled).toBe(false); - }); - - test('abort works when session still has matching requestId', async () => { - const conversation = createConversation('matching abort test'); - let abortCalled = false; - - const session = { - isProcessing: () => false, - currentRequestId: undefined as string | undefined, - persistUserMessage: (_c: string, _a: unknown[], reqId: string) => { - session.currentRequestId = reqId; - return undefined as unknown as string; - }, - memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false }, - setChannelCapabilities: () => {}, - setAssistantId: () => {}, - setGuardianContext: () => {}, - setCommandIntent: () => {}, - setTurnChannelContext: () => {}, - setVoiceCallControlPrompt: () => {}, - updateClient: () => {}, - runAgentLoop: async () => { - // Keep the agent loop running so the session stays on this requestId - await new Promise((r) => setTimeout(r, 500)); - }, - handleConfirmationResponse: () => {}, - abort: () => { abortCalled = true; }, - } as unknown as Session; - - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - const handle = await orchestrator.startRun(conversation.id, 'Hello'); - - // Abort while the session is still processing this run - handle.abort(); - expect(abortCalled).toBe(true); - }); -}); diff --git a/assistant/src/__tests__/runtime-runs-http.test.ts b/assistant/src/__tests__/runtime-runs-http.test.ts deleted file mode 100644 index 344e32b6256..00000000000 --- a/assistant/src/__tests__/runtime-runs-http.test.ts +++ /dev/null @@ -1,527 +0,0 @@ -/** - * HTTP-layer integration tests for the run API endpoints. - * - * Tests POST /runs, GET /runs/:id, and POST /runs/:id/decision - * through RuntimeHttpServer with a real RunOrchestrator instance. - */ -import { mkdtempSync, realpathSync,rmSync } from 'node:fs'; -import { tmpdir } from 'node:os'; -import { join } from 'node:path'; - -import { afterAll, beforeEach, describe, expect, mock,test } from 'bun:test'; - -import type { ServerMessage } from '../daemon/ipc-protocol.js'; -import type { Session } from '../daemon/session.js'; - -const testDir = realpathSync(mkdtempSync(join(tmpdir(), 'runtime-runs-http-test-'))); - -mock.module('../util/platform.js', () => ({ - getRootDir: () => testDir, - getDataDir: () => testDir, - isMacOS: () => process.platform === 'darwin', - isLinux: () => process.platform === 'linux', - isWindows: () => process.platform === 'win32', - getSocketPath: () => join(testDir, 'test.sock'), - getPidPath: () => join(testDir, 'test.pid'), - getDbPath: () => join(testDir, 'test.db'), - getLogPath: () => join(testDir, 'test.log'), - ensureDataDir: () => {}, -})); - -mock.module('../util/logger.js', () => ({ - getLogger: () => new Proxy({} as Record, { - get: () => () => {}, - }), -})); - -mock.module('../config/loader.js', () => ({ - getConfig: () => ({ - model: 'test', - provider: 'test', - apiKeys: {}, - memory: { enabled: false }, - rateLimit: { maxRequestsPerMinute: 0, maxTokensPerSession: 0 }, - secretDetection: { enabled: false }, - }), -})); - -import { getDb, initializeDb, resetDb } from '../memory/db.js'; -import { RuntimeHttpServer } from '../runtime/http-server.js'; -import { RunOrchestrator } from '../runtime/run-orchestrator.js'; - -initializeDb(); - -// --------------------------------------------------------------------------- -// Session helpers -// --------------------------------------------------------------------------- - -function makeCompletingSession(): Session { - let processing = false; - return { - isProcessing: () => processing, - persistUserMessage: () => undefined as unknown as string, - memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false }, - setChannelCapabilities: () => {}, - setAssistantId: () => {}, - setGuardianContext: () => {}, - setCommandIntent: () => {}, - setTurnChannelContext: () => {}, - setTurnInterfaceContext: () => {}, - setVoiceCallControlPrompt: () => {}, - updateClient: () => {}, - runAgentLoop: async () => { - processing = true; - await new Promise((r) => setTimeout(r, 20)); - processing = false; - }, - handleConfirmationResponse: () => {}, - handleSecretResponse: () => {}, - } as unknown as Session; -} - -function makeFailingSession(errorMsg: string): Session { - return { - isProcessing: () => false, - persistUserMessage: () => undefined as unknown as string, - memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false }, - setChannelCapabilities: () => {}, - setAssistantId: () => {}, - setGuardianContext: () => {}, - setCommandIntent: () => {}, - setTurnChannelContext: () => {}, - setTurnInterfaceContext: () => {}, - setVoiceCallControlPrompt: () => {}, - updateClient: () => {}, - runAgentLoop: async (_content: string, _messageId: string, onEvent: (msg: ServerMessage) => void) => { - onEvent({ type: 'error', message: errorMsg }); - }, - handleConfirmationResponse: () => {}, - handleSecretResponse: () => {}, - } as unknown as Session; -} - -function makeConfirmationSession(toolName: string): Session { - let clientHandler: (msg: ServerMessage) => void = () => {}; - return { - isProcessing: () => false, - persistUserMessage: () => undefined as unknown as string, - memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false }, - setChannelCapabilities: () => {}, - setAssistantId: () => {}, - setGuardianContext: () => {}, - setCommandIntent: () => {}, - setTurnChannelContext: () => {}, - setTurnInterfaceContext: () => {}, - setVoiceCallControlPrompt: () => {}, - updateClient: (handler: (msg: ServerMessage) => void) => { - clientHandler = handler; - }, - runAgentLoop: async () => { - clientHandler({ - type: 'confirmation_request', - requestId: 'req-1', - toolName, - input: { objective: 'test task' }, - riskLevel: 'medium', - allowlistOptions: [], - scopeOptions: [], - }); - // Hang to simulate waiting for decision - await new Promise(() => {}); - }, - handleConfirmationResponse: () => {}, - handleSecretResponse: () => {}, - } as unknown as Session; -} - -function makeHangingSession(): Session { - let processing = false; - return { - isProcessing: () => processing, - persistUserMessage: () => undefined as unknown as string, - memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false }, - setChannelCapabilities: () => {}, - setAssistantId: () => {}, - setGuardianContext: () => {}, - setCommandIntent: () => {}, - setTurnChannelContext: () => {}, - setTurnInterfaceContext: () => {}, - setVoiceCallControlPrompt: () => {}, - updateClient: () => {}, - runAgentLoop: async () => { - processing = true; - await new Promise(() => {}); - }, - handleConfirmationResponse: () => {}, - handleSecretResponse: () => {}, - } as unknown as Session; -} - -// --------------------------------------------------------------------------- -// Tests -// --------------------------------------------------------------------------- - -const TEST_TOKEN = 'test-bearer-token-runs'; -const AUTH_HEADERS = { Authorization: `Bearer ${TEST_TOKEN}` }; - -describe('runtime runs — HTTP layer', () => { - let server: RuntimeHttpServer; - let port: number; - - beforeEach(() => { - const db = getDb(); - db.run('DELETE FROM message_runs'); - db.run('DELETE FROM messages'); - db.run('DELETE FROM conversations'); - db.run('DELETE FROM conversation_keys'); - }); - - afterAll(() => { - resetDb(); - try { rmSync(testDir, { recursive: true, force: true }); } catch { /* best effort */ } - }); - - async function startServer(sessionFactory: () => Session): Promise<{ orchestrator: RunOrchestrator }> { - port = 18000 + Math.floor(Math.random() * 1000); - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => sessionFactory(), - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - server = new RuntimeHttpServer({ port, bearerToken: TEST_TOKEN, runOrchestrator: orchestrator }); - await server.start(); - return { orchestrator }; - } - - async function stopServer(): Promise { - await server?.stop(); - } - - function runsUrl(path = ''): string { - return `http://127.0.0.1:${port}/v1/runs${path}`; - } - - // ── Auth ──────────────────────────────────────────────────────────── - - test('returns 401 when bearer token is missing', async () => { - await startServer(() => makeCompletingSession()); - - const res = await fetch(runsUrl(), { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - conversationKey: 'conv-noauth', - content: 'Hi', - sourceChannel: 'vellum', - interface: 'macos', - }), - }); - - expect(res.status).toBe(401); - await stopServer(); - }); - - test('returns 401 when bearer token is wrong', async () => { - await startServer(() => makeCompletingSession()); - - const res = await fetch(runsUrl(), { - method: 'POST', - headers: { 'Content-Type': 'application/json', Authorization: 'Bearer wrong-token' }, - body: JSON.stringify({ - conversationKey: 'conv-badauth', - content: 'Hi', - sourceChannel: 'vellum', - interface: 'macos', - }), - }); - - expect(res.status).toBe(401); - await stopServer(); - }); - - test('healthz is accessible without auth and includes disk space info', async () => { - await startServer(() => makeCompletingSession()); - - const res = await fetch(`http://127.0.0.1:${port}/healthz`); - expect(res.status).toBe(200); - - const body = await res.json() as { status: string; disk: { path: string; totalMb: number; usedMb: number; freeMb: number } | null }; - expect(body.status).toBe('healthy'); - if (body.disk != null) { - expect(typeof body.disk.path).toBe('string'); - expect(body.disk.totalMb).toBeGreaterThan(0); - expect(body.disk.usedMb).toBeGreaterThanOrEqual(0); - expect(body.disk.freeMb).toBeGreaterThanOrEqual(0); - } - - await stopServer(); - }); - - // ── POST /runs ────────────────────────────────────────────────────── - - test('POST /runs creates a run and returns 201', async () => { - await startServer(() => makeCompletingSession()); - - const res = await fetch(runsUrl(), { - method: 'POST', - headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS }, - body: JSON.stringify({ - conversationKey: 'conv-1', - content: 'Hello', - sourceChannel: 'vellum', - interface: 'macos', - }), - }); - const body = await res.json() as { id: string; status: string; messageId: string; createdAt: string }; - - expect(res.status).toBe(201); - expect(body.id).toBeDefined(); - expect(body.status).toBe('running'); - expect(body.messageId).toBeNull(); - expect(body.createdAt).toBeDefined(); - - await stopServer(); - }); - - test('POST /runs returns 400 when conversationKey missing', async () => { - await startServer(() => makeCompletingSession()); - - const res = await fetch(runsUrl(), { - method: 'POST', - headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS }, - body: JSON.stringify({ - content: 'Hello', - sourceChannel: 'vellum', - interface: 'macos', - }), - }); - - expect(res.status).toBe(400); - const body = await res.json() as { error: string }; - expect(body.error).toContain('conversationKey'); - - await stopServer(); - }); - - test('POST /runs returns 400 when content is empty', async () => { - await startServer(() => makeCompletingSession()); - - const res = await fetch(runsUrl(), { - method: 'POST', - headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS }, - body: JSON.stringify({ - conversationKey: 'conv-2', - content: '', - sourceChannel: 'vellum', - interface: 'macos', - }), - }); - - expect(res.status).toBe(400); - - await stopServer(); - }); - - test('POST /runs returns 409 when session busy', async () => { - const session = makeHangingSession(); - await startServer(() => session); - - // First run starts and hangs - const res1 = await fetch(runsUrl(), { - method: 'POST', - headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS }, - body: JSON.stringify({ - conversationKey: 'conv-busy', - content: 'First', - sourceChannel: 'vellum', - interface: 'macos', - }), - }); - expect(res1.status).toBe(201); - - await new Promise((r) => setTimeout(r, 30)); - - // Second run should be rejected - const res2 = await fetch(runsUrl(), { - method: 'POST', - headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS }, - body: JSON.stringify({ - conversationKey: 'conv-busy', - content: 'Second', - sourceChannel: 'vellum', - interface: 'macos', - }), - }); - expect(res2.status).toBe(409); - - await stopServer(); - }); - - // ── GET /runs/:id ─────────────────────────────────────────────────── - - test('GET /runs/:id returns run status', async () => { - await startServer(() => makeHangingSession()); - - const createRes = await fetch(runsUrl(), { - method: 'POST', - headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS }, - body: JSON.stringify({ - conversationKey: 'conv-get', - content: 'Test', - sourceChannel: 'vellum', - interface: 'macos', - }), - }); - const { id } = await createRes.json() as { id: string }; - - const getRes = await fetch(runsUrl(`/${id}`), { headers: AUTH_HEADERS }); - const body = await getRes.json() as { id: string; status: string; messageId: string }; - - expect(getRes.status).toBe(200); - expect(body.id).toBe(id); - expect(body.status).toBe('running'); - - await stopServer(); - }); - - test('GET /runs/:id returns completed after agent loop finishes', async () => { - await startServer(() => makeCompletingSession()); - - const createRes = await fetch(runsUrl(), { - method: 'POST', - headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS }, - body: JSON.stringify({ - conversationKey: 'conv-done', - content: 'Build it', - sourceChannel: 'vellum', - interface: 'macos', - }), - }); - const { id } = await createRes.json() as { id: string }; - - await new Promise((r) => setTimeout(r, 100)); - - const getRes = await fetch(runsUrl(`/${id}`), { headers: AUTH_HEADERS }); - const body = await getRes.json() as { id: string; status: string }; - - expect(getRes.status).toBe(200); - expect(body.status).toBe('completed'); - - await stopServer(); - }); - - test('GET /runs/:id returns failed with error', async () => { - await startServer(() => makeFailingSession('Backend error')); - - const createRes = await fetch(runsUrl(), { - method: 'POST', - headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS }, - body: JSON.stringify({ - conversationKey: 'conv-fail', - content: 'Do it', - sourceChannel: 'vellum', - interface: 'macos', - }), - }); - const { id } = await createRes.json() as { id: string }; - - await new Promise((r) => setTimeout(r, 50)); - - const getRes = await fetch(runsUrl(`/${id}`), { headers: AUTH_HEADERS }); - const body = await getRes.json() as { id: string; status: string; error: string }; - - expect(getRes.status).toBe(200); - expect(body.status).toBe('failed'); - expect(body.error).toBe('Backend error'); - - await stopServer(); - }); - - test('GET /runs/:id returns 404 for unknown run', async () => { - await startServer(() => makeCompletingSession()); - - const res = await fetch(runsUrl('/nonexistent'), { headers: AUTH_HEADERS }); - expect(res.status).toBe(404); - - await stopServer(); - }); - - // ── POST /runs/:id/decision ───────────────────────────────────────── - - test('POST /runs/:id/decision returns accepted for pending confirmation', async () => { - await startServer(() => makeConfirmationSession('swarm_delegate')); - - const createRes = await fetch(runsUrl(), { - method: 'POST', - headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS }, - body: JSON.stringify({ - conversationKey: 'conv-decide', - content: 'Approve', - sourceChannel: 'vellum', - interface: 'macos', - }), - }); - const { id } = await createRes.json() as { id: string }; - - await new Promise((r) => setTimeout(r, 50)); - - // Verify pending state via GET - const getRes = await fetch(runsUrl(`/${id}`), { headers: AUTH_HEADERS }); - const runBody = await getRes.json() as { status: string; pendingConfirmation: { toolName: string } }; - expect(runBody.status).toBe('needs_confirmation'); - expect(runBody.pendingConfirmation.toolName).toBe('swarm_delegate'); - - // Submit decision - const decisionRes = await fetch(runsUrl(`/${id}/decision`), { - method: 'POST', - headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS }, - body: JSON.stringify({ decision: 'allow' }), - }); - const decisionBody = await decisionRes.json() as { accepted: boolean }; - - expect(decisionRes.status).toBe(200); - expect(decisionBody.accepted).toBe(true); - - await stopServer(); - }); - - test('POST /runs/:id/decision returns 400 for invalid decision', async () => { - await startServer(() => makeHangingSession()); - - const createRes = await fetch(runsUrl(), { - method: 'POST', - headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS }, - body: JSON.stringify({ - conversationKey: 'conv-bad-dec', - content: 'Test', - sourceChannel: 'vellum', - interface: 'macos', - }), - }); - const { id } = await createRes.json() as { id: string }; - - const res = await fetch(runsUrl(`/${id}/decision`), { - method: 'POST', - headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS }, - body: JSON.stringify({ decision: 'maybe' }), - }); - - expect(res.status).toBe(400); - - await stopServer(); - }); - - test('POST /runs/:id/decision returns 404 for unknown run', async () => { - await startServer(() => makeCompletingSession()); - - const res = await fetch(runsUrl('/nonexistent/decision'), { - method: 'POST', - headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS }, - body: JSON.stringify({ decision: 'allow' }), - }); - - expect(res.status).toBe(404); - - await stopServer(); - }); -}); diff --git a/assistant/src/__tests__/runtime-runs.test.ts b/assistant/src/__tests__/runtime-runs.test.ts deleted file mode 100644 index efa8f52694a..00000000000 --- a/assistant/src/__tests__/runtime-runs.test.ts +++ /dev/null @@ -1,286 +0,0 @@ -/** - * Integration tests for the run API lifecycle with swarm tool behavior. - * - * Verifies: create run → poll status → completion/failure transitions, - * including queue behavior when a swarm is active. - */ -import { mkdtempSync, rmSync } from 'node:fs'; -import { tmpdir } from 'node:os'; -import { join } from 'node:path'; - -import { afterAll, beforeEach, describe, expect, mock,test } from 'bun:test'; - -import type { ServerMessage } from '../daemon/ipc-protocol.js'; -import type { Session } from '../daemon/session.js'; - -const testDir = mkdtempSync(join(tmpdir(), 'runtime-runs-test-')); - -mock.module('../util/platform.js', () => ({ - getRootDir: () => testDir, - getDataDir: () => testDir, - isMacOS: () => process.platform === 'darwin', - isLinux: () => process.platform === 'linux', - isWindows: () => process.platform === 'win32', - getSocketPath: () => join(testDir, 'test.sock'), - getPidPath: () => join(testDir, 'test.pid'), - getDbPath: () => join(testDir, 'test.db'), - getLogPath: () => join(testDir, 'test.log'), - ensureDataDir: () => {}, -})); - -mock.module('../util/logger.js', () => ({ - getLogger: () => new Proxy({} as Record, { - get: () => () => {}, - }), -})); - -import { createConversation } from '../memory/conversation-store.js'; -import { getDb, initializeDb, resetDb } from '../memory/db.js'; -import { RunOrchestrator } from '../runtime/run-orchestrator.js'; - -initializeDb(); - -// --------------------------------------------------------------------------- -// Session helpers -// --------------------------------------------------------------------------- - -/** Session whose agent loop completes immediately (success). */ -function makeCompletingSession(): Session { - let processing = false; - return { - isProcessing: () => processing, - persistUserMessage: () => undefined as unknown as string, - memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false }, - setChannelCapabilities: () => {}, - setAssistantId: () => {}, - setGuardianContext: () => {}, - setCommandIntent: () => {}, - setTurnChannelContext: () => {}, - setVoiceCallControlPrompt: () => {}, - updateClient: () => {}, - runAgentLoop: async () => { - processing = true; - // Simulate brief processing then complete - await new Promise((r) => setTimeout(r, 20)); - processing = false; - }, - handleConfirmationResponse: () => {}, - handleSecretResponse: () => {}, - } as unknown as Session; -} - -/** Session whose agent loop hangs (simulating a long-running swarm). */ -function makeHangingSession(): Session { - let processing = false; - return { - isProcessing: () => processing, - persistUserMessage: () => undefined as unknown as string, - memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false }, - setChannelCapabilities: () => {}, - setAssistantId: () => {}, - setGuardianContext: () => {}, - setCommandIntent: () => {}, - setTurnChannelContext: () => {}, - setVoiceCallControlPrompt: () => {}, - updateClient: () => {}, - runAgentLoop: async () => { - processing = true; - // Never resolves — simulates an active swarm - await new Promise(() => {}); - }, - handleConfirmationResponse: () => {}, - handleSecretResponse: () => {}, - } as unknown as Session; -} - -/** Session whose agent loop fails with an error event. */ -function makeFailingSession(errorMsg: string): Session { - return { - isProcessing: () => false, - persistUserMessage: () => undefined as unknown as string, - memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false }, - setChannelCapabilities: () => {}, - setAssistantId: () => {}, - setGuardianContext: () => {}, - setCommandIntent: () => {}, - setTurnChannelContext: () => {}, - setVoiceCallControlPrompt: () => {}, - updateClient: () => {}, - runAgentLoop: async (_content: string, _messageId: string, onEvent: (msg: ServerMessage) => void) => { - onEvent({ type: 'error', message: errorMsg }); - }, - handleConfirmationResponse: () => {}, - handleSecretResponse: () => {}, - } as unknown as Session; -} - -/** Session whose agent loop emits a confirmation_request. */ -function makeConfirmationSession(toolName: string): Session { - let clientHandler: (msg: ServerMessage) => void = () => {}; - return { - isProcessing: () => false, - persistUserMessage: () => undefined as unknown as string, - memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false }, - setChannelCapabilities: () => {}, - setAssistantId: () => {}, - setGuardianContext: () => {}, - setCommandIntent: () => {}, - setTurnChannelContext: () => {}, - setVoiceCallControlPrompt: () => {}, - updateClient: (handler: (msg: ServerMessage) => void) => { - clientHandler = handler; - }, - runAgentLoop: async () => { - clientHandler({ - type: 'confirmation_request', - requestId: 'req-1', - toolName, - input: { objective: 'test task' }, - riskLevel: 'medium', - allowlistOptions: [], - scopeOptions: [], - }); - // Hang to simulate waiting for decision - await new Promise(() => {}); - }, - handleConfirmationResponse: () => {}, - handleSecretResponse: () => {}, - } as unknown as Session; -} - -// --------------------------------------------------------------------------- -// Tests -// --------------------------------------------------------------------------- - -describe('runtime runs — swarm lifecycle', () => { - beforeEach(() => { - const db = getDb(); - db.run('DELETE FROM message_runs'); - db.run('DELETE FROM messages'); - db.run('DELETE FROM conversations'); - }); - - afterAll(() => { - resetDb(); - try { rmSync(testDir, { recursive: true, force: true }); } catch { /* best effort */ } - }); - - test('run transitions to completed after agent loop finishes', async () => { - const conversation = createConversation('run complete test'); - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => makeCompletingSession(), - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - const { run } = await orchestrator.startRun(conversation.id, 'Build a feature'); - expect(run.status).toBe('running'); - - // Wait for agent loop to complete - await new Promise((r) => setTimeout(r, 100)); - - const stored = orchestrator.getRun(run.id); - expect(stored?.status).toBe('completed'); - }); - - test('run transitions to failed when agent loop reports error', async () => { - const conversation = createConversation('run error test'); - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => makeFailingSession('Swarm backend unavailable'), - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - const { run } = await orchestrator.startRun(conversation.id, 'Run swarm'); - - await new Promise((r) => setTimeout(r, 50)); - - const stored = orchestrator.getRun(run.id); - expect(stored?.status).toBe('failed'); - expect(stored?.error).toBe('Swarm backend unavailable'); - }); - - test('run enters needs_confirmation when tool requires approval', async () => { - const conversation = createConversation('run confirmation test'); - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => makeConfirmationSession('swarm_delegate'), - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - const { run } = await orchestrator.startRun(conversation.id, 'Delegate a swarm task'); - - // Give agent loop time to emit confirmation_request - await new Promise((r) => setTimeout(r, 50)); - - const stored = orchestrator.getRun(run.id); - expect(stored?.status).toBe('needs_confirmation'); - expect(stored?.pendingConfirmation?.toolName).toBe('swarm_delegate'); - }); - - test('decision endpoint transitions run back to running', async () => { - const conversation = createConversation('run decision test'); - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => makeConfirmationSession('swarm_delegate'), - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - const { run } = await orchestrator.startRun(conversation.id, 'Run with approval'); - await new Promise((r) => setTimeout(r, 50)); - - // Verify pending state - const pending = orchestrator.getRun(run.id); - expect(pending?.status).toBe('needs_confirmation'); - - // Submit decision - const result = orchestrator.submitDecision(run.id, 'allow'); - expect(result).toBe('applied'); - - // Confirmation should be cleared - const after = orchestrator.getRun(run.id); - expect(after?.pendingConfirmation).toBeNull(); - }); - - test('second run on busy session is rejected', async () => { - const hangingSession = makeHangingSession(); - const conversation = createConversation('queue test'); - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => hangingSession, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - - // First run starts and hangs - await orchestrator.startRun(conversation.id, 'First run'); - await new Promise((r) => setTimeout(r, 20)); - - // Second run on the same session should be rejected - try { - await orchestrator.startRun(conversation.id, 'Second run'); - // Should not reach here - expect(true).toBe(false); - } catch (err) { - expect((err as Error).message).toContain('already processing'); - } - }); - - test('getRun returns null for nonexistent run', () => { - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => makeCompletingSession(), - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - expect(orchestrator.getRun('nonexistent-id')).toBeNull(); - }); - - test('submitDecision returns run_not_found for unknown run', () => { - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => makeCompletingSession(), - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - const result = orchestrator.submitDecision('nonexistent-id', 'allow'); - expect(result).toBe('run_not_found'); - }); -}); diff --git a/assistant/src/__tests__/session-queue.test.ts b/assistant/src/__tests__/session-queue.test.ts index 2b369464ef9..62079eecf91 100644 --- a/assistant/src/__tests__/session-queue.test.ts +++ b/assistant/src/__tests__/session-queue.test.ts @@ -495,7 +495,7 @@ describe('Session message queue', () => { expect(sessionErr).toBeDefined(); // Should also emit generic error for backward compatibility - // (RunOrchestrator relies on error events to detect failures) + // (callers rely on error events to detect failures) const genericErr = events.find((e) => e.type === 'error'); expect(genericErr).toBeDefined(); }); diff --git a/assistant/src/__tests__/voice-session-bridge.test.ts b/assistant/src/__tests__/voice-session-bridge.test.ts index 020ea333b76..976b1a5feee 100644 --- a/assistant/src/__tests__/voice-session-bridge.test.ts +++ b/assistant/src/__tests__/voice-session-bridge.test.ts @@ -40,10 +40,9 @@ mock.module('../config/loader.js', () => ({ }), })); -import { setVoiceBridgeOrchestrator, startVoiceTurn } from '../calls/voice-session-bridge.js'; +import { setVoiceBridgeDeps, startVoiceTurn } from '../calls/voice-session-bridge.js'; import { createConversation } from '../memory/conversation-store.js'; import { getDb, initializeDb, resetDb } from '../memory/db.js'; -import { RunOrchestrator } from '../runtime/run-orchestrator.js'; initializeDb(); @@ -73,15 +72,25 @@ function makeStreamingSession(events: ServerMessage[]): Session { } as unknown as Session; } +/** + * Helper to inject voice bridge deps with a given session factory. + */ +function injectDeps(sessionFactory: () => Session): void { + setVoiceBridgeDeps({ + getOrCreateSession: async () => sessionFactory(), + resolveAttachments: () => [], + deriveDefaultStrictSideEffects: () => false, + }); +} + describe('voice-session-bridge', () => { beforeEach(() => { const db = getDb(); - db.run('DELETE FROM message_runs'); db.run('DELETE FROM messages'); db.run('DELETE FROM conversations'); }); - test('throws when orchestrator not injected', async () => { + test('throws when deps not injected', async () => { // Reset the module-level orchestrator by re-calling with undefined // (we can't easily reset module state, so we test the fresh import path) // Instead, test that startVoiceTurn works after injection @@ -96,13 +105,7 @@ describe('voice-session-bridge', () => { { type: 'message_complete', sessionId: conversation.id }, ]; const session = makeStreamingSession(events); - - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - setVoiceBridgeOrchestrator(orchestrator); + injectDeps(() => session); const receivedDeltas: string[] = []; let completed = false; @@ -121,7 +124,7 @@ describe('voice-session-bridge', () => { expect(receivedDeltas).toEqual(['Hello ', 'world']); expect(completed).toBe(true); - expect(handle.runId).toBeDefined(); + expect(handle.turnId).toBeDefined(); expect(typeof handle.abort).toBe('function'); }); @@ -131,13 +134,7 @@ describe('voice-session-bridge', () => { { type: 'error', message: 'Provider unavailable' }, ]; const session = makeStreamingSession(events); - - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - setVoiceBridgeOrchestrator(orchestrator); + injectDeps(() => session); const receivedErrors: string[] = []; await startVoiceTurn({ @@ -154,7 +151,7 @@ describe('voice-session-bridge', () => { expect(receivedErrors).toEqual(['Provider unavailable']); }); - test('abort handle cancels the in-flight run', async () => { + test('abort handle cancels the in-flight turn', async () => { const conversation = createConversation('voice bridge abort test'); let abortCalled = false; @@ -180,12 +177,7 @@ describe('voice-session-bridge', () => { abort: () => { abortCalled = true; }, } as unknown as Session; - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - setVoiceBridgeOrchestrator(orchestrator); + injectDeps(() => session); const handle = await startVoiceTurn({ conversationId: conversation.id, @@ -200,7 +192,7 @@ describe('voice-session-bridge', () => { expect(abortCalled).toBe(true); }); - test('external AbortSignal triggers run abort', async () => { + test('external AbortSignal triggers turn abort', async () => { const conversation = createConversation('voice bridge signal test'); let abortCalled = false; @@ -226,12 +218,7 @@ describe('voice-session-bridge', () => { abort: () => { abortCalled = true; }, } as unknown as Session; - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - setVoiceBridgeOrchestrator(orchestrator); + injectDeps(() => session); const ac = new AbortController(); await startVoiceTurn({ @@ -264,12 +251,7 @@ describe('voice-session-bridge', () => { setTurnChannelContext: (ctx: unknown) => { capturedTurnChannelContext = ctx; }, } as unknown as Session; - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - setVoiceBridgeOrchestrator(orchestrator); + injectDeps(() => session); await startVoiceTurn({ conversationId: conversation.id, @@ -301,12 +283,7 @@ describe('voice-session-bridge', () => { set memoryPolicy(val: Record) { capturedStrictSideEffects = val.strictSideEffects as boolean; }, } as unknown as Session; - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - setVoiceBridgeOrchestrator(orchestrator); + injectDeps(() => session); await startVoiceTurn({ conversationId: conversation.id, @@ -342,12 +319,7 @@ describe('voice-session-bridge', () => { set memoryPolicy(val: Record) { capturedStrictSideEffects = val.strictSideEffects as boolean; }, } as unknown as Session; - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - setVoiceBridgeOrchestrator(orchestrator); + injectDeps(() => session); await startVoiceTurn({ conversationId: conversation.id, @@ -381,12 +353,7 @@ describe('voice-session-bridge', () => { set memoryPolicy(val: Record) { capturedStrictSideEffects = val.strictSideEffects as boolean; }, } as unknown as Session; - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - setVoiceBridgeOrchestrator(orchestrator); + injectDeps(() => session); await startVoiceTurn({ conversationId: conversation.id, @@ -423,12 +390,7 @@ describe('voice-session-bridge', () => { }, } as unknown as Session; - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - setVoiceBridgeOrchestrator(orchestrator); + injectDeps(() => session); const guardianCtx = { sourceChannel: 'voice' as const, @@ -504,12 +466,7 @@ describe('voice-session-bridge', () => { abort: () => {}, } as unknown as Session; - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - setVoiceBridgeOrchestrator(orchestrator); + injectDeps(() => session); await startVoiceTurn({ conversationId: conversation.id, @@ -579,12 +536,7 @@ describe('voice-session-bridge', () => { abort: () => {}, } as unknown as Session; - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - setVoiceBridgeOrchestrator(orchestrator); + injectDeps(() => session); await startVoiceTurn({ conversationId: conversation.id, @@ -646,12 +598,7 @@ describe('voice-session-bridge', () => { abort: () => {}, } as unknown as Session; - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - setVoiceBridgeOrchestrator(orchestrator); + injectDeps(() => session); await startVoiceTurn({ conversationId: conversation.id, @@ -713,12 +660,7 @@ describe('voice-session-bridge', () => { abort: () => {}, } as unknown as Session; - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - setVoiceBridgeOrchestrator(orchestrator); + injectDeps(() => session); await startVoiceTurn({ conversationId: conversation.id, @@ -785,12 +727,7 @@ describe('voice-session-bridge', () => { abort: () => {}, } as unknown as Session; - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - setVoiceBridgeOrchestrator(orchestrator); + injectDeps(() => session); await startVoiceTurn({ conversationId: conversation.id, @@ -841,12 +778,7 @@ describe('voice-session-bridge', () => { abort: () => { abortCalled = true; }, } as unknown as Session; - const orchestrator = new RunOrchestrator({ - getOrCreateSession: async () => session, - resolveAttachments: () => [], - deriveDefaultStrictSideEffects: () => false, - }); - setVoiceBridgeOrchestrator(orchestrator); + injectDeps(() => session); const ac = new AbortController(); ac.abort(); // Pre-abort before calling startVoiceTurn diff --git a/assistant/src/calls/relay-server.ts b/assistant/src/calls/relay-server.ts index a6c5e15bea5..b502fbadc39 100644 --- a/assistant/src/calls/relay-server.ts +++ b/assistant/src/calls/relay-server.ts @@ -811,7 +811,7 @@ export class RelayConnection { const session = getCallSession(this.callSessionId); if (session) { // User message persistence is handled by the session pipeline - // (RunOrchestrator.startRun -> session.persistUserMessage) so we only + // (voice-session-bridge -> session.persistUserMessage) so we only // need to fire the transcript notifier for UI subscribers here. fireCallTranscriptNotifier(session.conversationId, this.callSessionId, 'caller', msg.voicePrompt); } diff --git a/assistant/src/calls/voice-session-bridge.ts b/assistant/src/calls/voice-session-bridge.ts index 55e2b123b91..ba06193f10e 100644 --- a/assistant/src/calls/voice-session-bridge.ts +++ b/assistant/src/calls/voice-session-bridge.ts @@ -1,19 +1,27 @@ /** - * Bridge between voice relay and the daemon session/run pipeline. + * Bridge between voice relay and the daemon session pipeline. * - * Provides a `startVoiceTurn()` function that wraps RunOrchestrator.startRun() - * with voice-specific defaults, translating agent-loop events into simple - * callbacks suitable for real-time TTS streaming. + * Provides a `startVoiceTurn()` function that manages a voice turn + * directly through the session, translating agent-loop events into + * simple callbacks suitable for real-time TTS streaming. * * Dependency injection follows the same module-level setter pattern used by - * setRelayBroadcast in relay-server.ts: the daemon lifecycle injects the - * RunOrchestrator instance at startup via `setVoiceBridgeOrchestrator()`. + * setRelayBroadcast in relay-server.ts: the daemon lifecycle injects + * dependencies at startup via `setVoiceBridgeDeps()`. */ +import type { ChannelId } from '../channels/types.js'; +import { parseChannelId } from '../channels/types.js'; import { getConfig } from '../config/loader.js'; +import type { ServerMessage } from '../daemon/ipc-protocol.js'; +import type { Session } from '../daemon/session.js'; import type { GuardianRuntimeContext } from '../daemon/session-runtime-assembly.js'; -import type { RunOrchestrator, VoiceRunEventSink } from '../runtime/run-orchestrator.js'; +import { resolveChannelCapabilities } from '../daemon/session-runtime-assembly.js'; +import { checkIngressForSecrets } from '../security/secret-ingress.js'; +import { IngressBlockedError } from '../util/errors.js'; import { getLogger } from '../util/logger.js'; +import { buildAssistantEvent } from '../runtime/assistant-event.js'; +import { assistantEventHub } from '../runtime/assistant-event-hub.js'; /** * Matches the exact `[CALL_OPENING]` marker that call-controller sends for @@ -30,20 +38,47 @@ const log = getLogger('voice-session-bridge'); // Module-level dependency injection // --------------------------------------------------------------------------- -let orchestrator: RunOrchestrator | undefined; +export interface VoiceBridgeDeps { + getOrCreateSession: (conversationId: string, transport?: { + channelId: ChannelId; + hints?: string[]; + uxBrief?: string; + }) => Promise; + resolveAttachments: (attachmentIds: string[]) => Array<{ + id: string; + filename: string; + mimeType: string; + data: string; + }>; + deriveDefaultStrictSideEffects: (conversationId: string) => boolean; +} + +let deps: VoiceBridgeDeps | undefined; /** - * Inject the RunOrchestrator instance from daemon lifecycle. + * Inject dependencies from daemon lifecycle. * Must be called during daemon startup before any voice turns are executed. */ -export function setVoiceBridgeOrchestrator(orch: RunOrchestrator): void { - orchestrator = orch; +export function setVoiceBridgeDeps(d: VoiceBridgeDeps): void { + deps = d; } // --------------------------------------------------------------------------- // Types // --------------------------------------------------------------------------- +/** + * Real-time event sink for voice TTS streaming. Agent-loop events are + * forwarded here for real-time text-to-speech without modifying the + * standard channel path. + */ +export interface VoiceRunEventSink { + onTextDelta(text: string): void; + onMessageComplete(): void; + onError(message: string): void; + onToolUse(toolName: string, input: Record): void; +} + export interface VoiceTurnOptions { /** The conversation ID for this voice call's session. */ conversationId: string; @@ -68,8 +103,8 @@ export interface VoiceTurnOptions { } export interface VoiceTurnHandle { - /** The run ID for this turn. */ - runId: string; + /** Unique identifier for this turn. */ + turnId: string; /** Abort the in-flight turn (e.g. for barge-in). */ abort: () => void; } @@ -172,17 +207,23 @@ function buildVoiceCallControlPrompt(opts: { /** * Execute a single voice turn through the daemon session pipeline. * - * Wraps RunOrchestrator.startRun() with voice-specific defaults: + * Manages the session directly with voice-specific defaults: * - sourceChannel: 'voice' - * - eventSink wired to the provided callbacks + * - event sink wired to the provided callbacks * - abort propagated from the returned handle * * The caller (CallController via relay-server) can use the returned handle * to cancel the turn on barge-in. */ export async function startVoiceTurn(opts: VoiceTurnOptions): Promise { - if (!orchestrator) { - throw new Error('Voice bridge not initialized — setVoiceBridgeOrchestrator() was not called'); + if (!deps) { + throw new Error('Voice bridge not initialized — setVoiceBridgeDeps() was not called'); + } + + // Block inbound content that contains secrets + const ingressCheck = checkIngressForSecrets(opts.content); + if (ingressCheck.blocked) { + throw new IngressBlockedError(ingressCheck.userNotice!, ingressCheck.detectedTypes); } const eventSink: VoiceRunEventSink = { @@ -221,40 +262,195 @@ export async function startVoiceTurn(opts: VoiceTurnOptions): Promise setTimeout(resolve, pollIntervalMs)); + waited += pollIntervalMs; + } + if (opts.signal?.aborted) { + throw new Error('Turn aborted while waiting for session'); + } + if (session.isProcessing()) { + throw new Error('Session is already processing a message'); + } + } + + // Configure session for this voice turn + const strictSideEffects = forceStrictSideEffects + ?? deps.deriveDefaultStrictSideEffects(opts.conversationId); + session.memoryPolicy = { + ...session.memoryPolicy, + strictSideEffects, + }; + session.setAssistantId(opts.assistantId ?? 'self'); + session.setGuardianContext(opts.guardianContext ?? null); + session.setCommandIntent(null); + session.setTurnChannelContext({ + userMessageChannel: 'voice', + assistantMessageChannel: 'voice', + }); + session.setChannelCapabilities(resolveChannelCapabilities('voice', undefined)); + session.setVoiceCallControlPrompt(voiceCallControlPrompt); + + const requestId = crypto.randomUUID(); + const turnId = crypto.randomUUID(); + const messageId = session.persistUserMessage(persistedContent, [], requestId); + + // Serialized publish chain so hub subscribers observe events in order. + let hubChain: Promise = Promise.resolve(); + const publishToHub = (msg: ServerMessage): void => { + const msgRecord = msg as unknown as Record; + const msgSessionId = + 'sessionId' in msg && typeof msgRecord.sessionId === 'string' + ? (msgRecord.sessionId as string) + : undefined; + const resolvedSessionId = msgSessionId ?? opts.conversationId; + const event = buildAssistantEvent('self', msg, resolvedSessionId); + hubChain = (async () => { + await hubChain; + try { + await assistantEventHub.publish(event); + } catch (err) { + log.warn({ err }, 'assistant-events hub subscriber threw during voice turn'); + } + })(); + }; + + // Hook into session to intercept confirmation_request and secret_request events. + // Voice auto-denies/auto-allows/auto-resolves these since there's no interactive UI. + const autoDeny = !isGuardian; + const autoAllow = isGuardian; + let lastError: string | null = null; + session.updateClient((msg: ServerMessage) => { + if (msg.type === 'confirmation_request') { + if (autoDeny) { + log.info( + { turnId, toolName: msg.toolName }, + 'Auto-denying confirmation request for voice turn (forceStrictSideEffects)', + ); + session.handleConfirmationResponse( + msg.requestId, + 'deny', + undefined, + undefined, + `Permission denied for "${msg.toolName}": this voice call does not have interactive approval capabilities. Side-effect tools are not available for non-guardian voice callers. In your next assistant reply, explain briefly that this action requires guardian-level access and cannot be performed during this call.`, + ); + publishToHub(msg); + return; + } + if (autoAllow) { + log.info( + { turnId, toolName: msg.toolName }, + 'Auto-approving confirmation request for guardian voice turn', + ); + session.handleConfirmationResponse( + msg.requestId, + 'allow', + undefined, + undefined, + `Permission approved for "${msg.toolName}": this is a verified guardian voice call.`, + ); + publishToHub(msg); + return; + } + } else if (msg.type === 'secret_request') { + // Voice has no secret-entry UI, so resolve immediately + log.info( + { turnId, service: msg.service, field: msg.field }, + 'Auto-resolving secret request for voice turn (no secret-entry UI)', + ); + session.handleSecretResponse(msg.requestId, undefined, 'store'); + publishToHub(msg); + return; + } + publishToHub(msg); + }); + + // Fire-and-forget the agent loop + const cleanup = () => { + // Reset channel capabilities so a subsequent IPC/desktop session on the + // same conversation is not incorrectly treated as a voice client. + session.setChannelCapabilities(null); + session.setGuardianContext(null); + session.setCommandIntent(null); + session.setAssistantId('self'); + session.setVoiceCallControlPrompt(null); + // Reset the session's client callback to a no-op so the stale + // closure doesn't intercept events from future turns on the same session. + session.updateClient(() => {}, true); + }; + + void (async () => { + try { + await session.runAgentLoop(persistedContent, messageId, (msg: ServerMessage) => { + if (msg.type === 'error') { + lastError = msg.message; + } else if (msg.type === 'session_error') { + lastError = msg.userMessage; + } + publishToHub(msg); + + // Forward voice-relevant events to the real-time event sink + if (msg.type === 'assistant_text_delta') { + eventSink.onTextDelta(msg.text); + } else if (msg.type === 'message_complete') { + eventSink.onMessageComplete(); + } else if (msg.type === 'generation_cancelled') { + // Treat cancellation as a completed turn so the voice + // turnComplete promise settles instead of hanging forever. + eventSink.onMessageComplete(); + } else if (msg.type === 'error') { + eventSink.onError(msg.message); + } else if (msg.type === 'session_error') { + eventSink.onError(msg.userMessage); + } else if (msg.type === 'tool_use_start') { + eventSink.onToolUse(msg.toolName, msg.input); + } + }); + if (lastError) { + log.error({ turnId, error: lastError }, 'Voice turn failed (error event from agent loop)'); + } + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + log.error({ err, turnId }, 'Voice turn failed'); + eventSink.onError(message); + } finally { + cleanup(); + } + })(); + + const abortFn = () => { + if (session.currentRequestId === requestId) { + session.abort(); + } + }; // If the caller provided an external AbortSignal (e.g. from a - // RelayConnection's AbortController), wire it to the run's abort. + // RelayConnection's AbortController), wire it to the turn's abort. if (opts.signal) { if (opts.signal.aborted) { - abort(); + abortFn(); } else { - opts.signal.addEventListener('abort', () => abort(), { once: true }); + opts.signal.addEventListener('abort', () => abortFn(), { once: true }); } } return { - runId: run.id, - abort, + turnId, + abort: abortFn, }; } diff --git a/assistant/src/daemon/lifecycle.ts b/assistant/src/daemon/lifecycle.ts index 99356a2e474..04902428b86 100644 --- a/assistant/src/daemon/lifecycle.ts +++ b/assistant/src/daemon/lifecycle.ts @@ -7,7 +7,7 @@ import { config as dotenvConfig } from 'dotenv'; import { reconcileCallsOnStartup } from '../calls/call-recovery.js'; import { setRelayBroadcast } from '../calls/relay-server.js'; import { TwilioConversationRelayProvider } from '../calls/twilio-provider.js'; -import { setVoiceBridgeOrchestrator } from '../calls/voice-session-bridge.js'; +import { setVoiceBridgeDeps } from '../calls/voice-session-bridge.js'; import { getQdrantUrlEnv, getRuntimeHttpHost, @@ -23,6 +23,7 @@ import { installTemplates } from '../hooks/templates.js'; import { initSentry } from '../instrument.js'; import { initLogfire } from '../logfire.js'; import * as attachmentsStore from '../memory/attachments-store.js'; +import * as conversationStore from '../memory/conversation-store.js'; import { initializeDb } from '../memory/db.js'; import { startMemoryJobsWorker } from '../memory/jobs-worker.js'; import { initQdrantClient } from '../memory/qdrant-client.js'; @@ -296,8 +297,6 @@ export async function runDaemon(): Promise { const hostname = getRuntimeHttpHost(); - const runOrchestrator = server.createRunOrchestrator(); - runtimeHttp = new RuntimeHttpServer({ port: httpPort, hostname, @@ -306,7 +305,6 @@ export async function runDaemon(): Promise { server.processMessage(conversationId, content, attachmentIds, options, sourceChannel, sourceInterface), persistAndProcessMessage: (conversationId, content, attachmentIds, options, sourceChannel, sourceInterface) => server.persistAndProcessMessage(conversationId, content, attachmentIds, options, sourceChannel, sourceInterface), - runOrchestrator, interfacesDir: getInterfacesDir(), approvalCopyGenerator: createApprovalCopyGenerator(), approvalConversationGenerator: createApprovalConversationGenerator(), @@ -324,10 +322,23 @@ export async function runDaemon(): Promise { }, }); - // Inject the voice bridge orchestrator BEFORE attempting to start the HTTP - // server. The bridge only needs the RunOrchestrator instance (already created - // above) and must be available even when the HTTP server fails to bind. - setVoiceBridgeOrchestrator(runOrchestrator); + // Inject voice bridge deps BEFORE attempting to start the HTTP server. + // The bridge must be available even when the HTTP server fails to bind. + setVoiceBridgeDeps({ + getOrCreateSession: (conversationId, transport) => + server.getSessionForMessages(conversationId), + resolveAttachments: (attachmentIds) => + attachmentsStore.getAttachmentsByIds(attachmentIds).map((a) => ({ + id: a.id, + filename: a.originalFilename, + mimeType: a.mimeType, + data: a.dataBase64, + })), + deriveDefaultStrictSideEffects: (conversationId) => { + const threadType = conversationStore.getConversationThreadType(conversationId); + return threadType === 'private'; + }, + }); try { await runtimeHttp.start(); setRelayBroadcast((msg) => server.broadcast(msg)); diff --git a/assistant/src/daemon/server.ts b/assistant/src/daemon/server.ts index 1c0b727aebf..35c812d1975 100644 --- a/assistant/src/daemon/server.ts +++ b/assistant/src/daemon/server.ts @@ -14,7 +14,6 @@ import { provenanceFromGuardianContext } from '../memory/conversation-store.js'; import { RateLimitProvider } from '../providers/ratelimit.js'; import { getFailoverProvider, initializeProviders } from '../providers/registry.js'; import * as pendingInteractions from '../runtime/pending-interactions.js'; -import { RunOrchestrator } from '../runtime/run-orchestrator.js'; import { checkIngressForSecrets } from '../security/secret-ingress.js'; import { cleanupRecordingsOnDisconnect } from './handlers/recording.js'; import { getSubagentManager } from '../subagent/index.js'; @@ -908,20 +907,4 @@ export class DaemonServer { return this.getOrCreateSession(conversationId, undefined, true); } - createRunOrchestrator(): RunOrchestrator { - return new RunOrchestrator({ - getOrCreateSession: (conversationId, transport) => - this.getOrCreateSession(conversationId, undefined, true, transport ? { transport } : undefined), - resolveAttachments: (attachmentIds) => - attachmentsStore.getAttachmentsByIds(attachmentIds).map((a) => ({ - id: a.id, - filename: a.originalFilename, - mimeType: a.mimeType, - data: a.dataBase64, - })), - deriveDefaultStrictSideEffects: (conversationId) => - this.deriveMemoryPolicy(conversationId).strictSideEffects, - }); - } - } diff --git a/assistant/src/memory/conversation-store.ts b/assistant/src/memory/conversation-store.ts index 8aefaf431a9..f4062e1f94c 100644 --- a/assistant/src/memory/conversation-store.ts +++ b/assistant/src/memory/conversation-store.ts @@ -12,7 +12,7 @@ import { createRowMapper } from '../util/row-mapper.js'; import { deleteOrphanAttachments } from './attachments-store.js'; import { getDb, rawAll, rawExec,rawGet } from './db.js'; import { indexMessageNow } from './indexer.js'; -import { channelInboundEvents, conversations, llmRequestLogs,memoryEmbeddings, memoryItemEntities, memoryItems, memoryItemSources, memorySegments, messageAttachments, messageRuns, messages, toolInvocations } from './schema.js'; +import { channelInboundEvents, conversations, llmRequestLogs,memoryEmbeddings, memoryItemEntities, memoryItems, memoryItemSources, memorySegments, messageAttachments, messages, toolInvocations } from './schema.js'; import { buildFtsMatchQuery } from './search/lexical.js'; const log = getLogger('conversation-store'); @@ -576,10 +576,6 @@ export function deleteMessageById(messageId: string): DeletedMemoryIds { const candidateItemIds = linkedItems.map((r) => r.memoryItemId); // Detach nullable FK references so the cascade doesn't destroy them. - tx.update(messageRuns) - .set({ messageId: null }) - .where(eq(messageRuns.messageId, messageId)) - .run(); tx.update(channelInboundEvents) .set({ messageId: null }) .where(eq(channelInboundEvents.messageId, messageId)) diff --git a/assistant/src/memory/runs-store.ts b/assistant/src/memory/runs-store.ts deleted file mode 100644 index d25ca028495..00000000000 --- a/assistant/src/memory/runs-store.ts +++ /dev/null @@ -1,306 +0,0 @@ -/** - * CRUD store for message runs (approval flow state). - * - * Runs track the lifecycle of an agent loop triggered by a user message: - * running → needs_confirmation → running → completed | failed - * running → needs_secret → running → completed | failed - */ - -import { and, eq, inArray } from 'drizzle-orm'; -import { v4 as uuid } from 'uuid'; - -import { getDb } from './db.js'; -import { messageRuns } from './schema.js'; - -// --------------------------------------------------------------------------- -// Types -// --------------------------------------------------------------------------- - -export type RunStatus = 'running' | 'needs_confirmation' | 'needs_secret' | 'completed' | 'failed'; - -export interface PendingConfirmation { - toolName: string; - toolUseId: string; - input: Record; - riskLevel: string; - executionTarget?: 'sandbox' | 'host'; - allowlistOptions?: Array<{ label: string; pattern: string }>; - scopeOptions?: Array<{ label: string; scope: string }>; - /** When false, the client should hide "always allow" / trust-rule persistence affordances. */ - persistentDecisionsAllowed?: boolean; -} - -export interface PendingSecret { - requestId: string; - service: string; - field: string; - label: string; - description?: string; - placeholder?: string; - purpose?: string; - allowOneTimeSend?: boolean; -} - -export interface Run { - id: string; - conversationId: string; - messageId: string | null; - status: RunStatus; - pendingConfirmation: PendingConfirmation | null; - pendingSecret: PendingSecret | null; - inputTokens: number; - outputTokens: number; - estimatedCost: number; - error: string | null; - createdAt: number; - updatedAt: number; -} - -export interface RunUsage { - inputTokens?: number; - outputTokens?: number; - estimatedCost?: number; -} - -// --------------------------------------------------------------------------- -// Helpers -// --------------------------------------------------------------------------- - -function rowToRun(row: typeof messageRuns.$inferSelect): Run { - let pendingConfirmation: PendingConfirmation | null = null; - if (row.pendingConfirmation) { - try { pendingConfirmation = JSON.parse(row.pendingConfirmation); } catch { /* malformed */ } - } - let pendingSecret: PendingSecret | null = null; - if (row.pendingSecret) { - try { pendingSecret = JSON.parse(row.pendingSecret); } catch { /* malformed */ } - } - return { - id: row.id, - conversationId: row.conversationId, - messageId: row.messageId, - status: row.status as RunStatus, - pendingConfirmation, - pendingSecret, - inputTokens: row.inputTokens, - outputTokens: row.outputTokens, - estimatedCost: row.estimatedCost, - error: row.error, - createdAt: row.createdAt, - updatedAt: row.updatedAt, - }; -} - -// --------------------------------------------------------------------------- -// Store -// --------------------------------------------------------------------------- - -export function createRun( - conversationId: string, - messageId?: string, -): Run { - const db = getDb(); - const now = Date.now(); - const id = uuid(); - - const row = { - id, - conversationId, - messageId: messageId ?? null, - status: 'running' as const, - pendingConfirmation: null, - pendingSecret: null, - inputTokens: 0, - outputTokens: 0, - estimatedCost: 0, - error: null, - createdAt: now, - updatedAt: now, - }; - - db.insert(messageRuns).values(row).run(); - - return rowToRun(row); -} - -export function getRun(runId: string): Run | null { - const db = getDb(); - const row = db.select().from(messageRuns).where(eq(messageRuns.id, runId)).get(); - return row ? rowToRun(row) : null; -} - -export function setRunConfirmation( - runId: string, - confirmation: PendingConfirmation, -): void { - const db = getDb(); - const now = Date.now(); - db.update(messageRuns) - .set({ - status: 'needs_confirmation', - pendingConfirmation: JSON.stringify(confirmation), - updatedAt: now, - }) - .where(eq(messageRuns.id, runId)) - .run(); -} - -export function clearRunConfirmation(runId: string): void { - const db = getDb(); - const now = Date.now(); - db.update(messageRuns) - .set({ - status: 'running', - pendingConfirmation: null, - updatedAt: now, - }) - .where(eq(messageRuns.id, runId)) - .run(); -} - -export function setRunSecret( - runId: string, - secret: PendingSecret, -): void { - const db = getDb(); - const now = Date.now(); - db.update(messageRuns) - .set({ - status: 'needs_secret', - pendingSecret: JSON.stringify(secret), - updatedAt: now, - }) - .where(eq(messageRuns.id, runId)) - .run(); -} - -export function clearRunSecret(runId: string): void { - const db = getDb(); - const now = Date.now(); - db.update(messageRuns) - .set({ - status: 'running', - pendingSecret: null, - updatedAt: now, - }) - .where(eq(messageRuns.id, runId)) - .run(); -} - -export function completeRun(runId: string, usage?: RunUsage): void { - const db = getDb(); - const now = Date.now(); - db.update(messageRuns) - .set({ - status: 'completed', - pendingConfirmation: null, - ...(usage?.inputTokens != null ? { inputTokens: usage.inputTokens } : {}), - ...(usage?.outputTokens != null ? { outputTokens: usage.outputTokens } : {}), - ...(usage?.estimatedCost != null ? { estimatedCost: usage.estimatedCost } : {}), - updatedAt: now, - }) - .where(eq(messageRuns.id, runId)) - .run(); -} - -export function failRun(runId: string, error: string): void { - const db = getDb(); - const now = Date.now(); - db.update(messageRuns) - .set({ - status: 'failed', - pendingConfirmation: null, - error, - updatedAt: now, - }) - .where(eq(messageRuns.id, runId)) - .run(); -} - -// --------------------------------------------------------------------------- -// Pending-confirmation lookups -// --------------------------------------------------------------------------- - -/** Summary of a run awaiting confirmation, used by channel approval flows. */ -export interface PendingRunInfo { - runId: string; - /** The prompter-level request ID stored inside PendingConfirmation.toolUseId. */ - requestId: string; - toolName: string; - input: Record; - riskLevel: string; - /** When false, persistent trust rules (approve_always) are not allowed. */ - persistentDecisionsAllowed?: boolean; -} - -/** - * Find all runs in `needs_confirmation` state for a given conversation. - * - * Returns structured info for each pending run so channel adapters can - * render approval prompts and route decisions without reaching into - * raw DB rows. - */ -export function getPendingConfirmationsByConversation(conversationId: string): PendingRunInfo[] { - const db = getDb(); - const rows = db - .select() - .from(messageRuns) - .where( - and( - eq(messageRuns.conversationId, conversationId), - eq(messageRuns.status, 'needs_confirmation'), - ), - ) - .all(); - - const results: PendingRunInfo[] = []; - for (const row of rows) { - const run = rowToRun(row); - if (!run.pendingConfirmation) continue; - results.push({ - runId: run.id, - requestId: run.pendingConfirmation.toolUseId, - toolName: run.pendingConfirmation.toolName, - input: run.pendingConfirmation.input, - riskLevel: run.pendingConfirmation.riskLevel, - persistentDecisionsAllowed: run.pendingConfirmation.persistentDecisionsAllowed, - }); - } - return results; -} - -// --------------------------------------------------------------------------- -// Orphan recovery -// --------------------------------------------------------------------------- - -/** - * Mark all non-terminal runs as failed. - * Called on startup to recover from daemon restarts that left runs - * in running/needs_confirmation/needs_secret with no in-memory state to resolve them. - * Returns the number of rows affected. - */ -export function failOrphanedRuns(): number { - const db = getDb(); - const now = Date.now(); - const activeStatuses = ['running', 'needs_confirmation', 'needs_secret']; - - // Count first so we can report how many were recovered. - const active = db.select({ id: messageRuns.id }) - .from(messageRuns) - .where(inArray(messageRuns.status, activeStatuses)) - .all(); - - if (active.length === 0) return 0; - - db.update(messageRuns) - .set({ - status: 'failed', - pendingConfirmation: null, - error: 'Run was interrupted (daemon restart)', - updatedAt: now, - }) - .where(inArray(messageRuns.status, activeStatuses)) - .run(); - - return active.length; -} diff --git a/assistant/src/memory/schema.ts b/assistant/src/memory/schema.ts index 114bd012b8f..7dbc904a93b 100644 --- a/assistant/src/memory/schema.ts +++ b/assistant/src/memory/schema.ts @@ -224,26 +224,6 @@ export const channelInboundEvents = sqliteTable('channel_inbound_events', { updatedAt: integer('updated_at').notNull(), }); -// ── Message Runs (approval flow) ───────────────────────────────────── - -export const messageRuns = sqliteTable('message_runs', { - id: text('id').primaryKey(), - conversationId: text('conversation_id') - .notNull() - .references(() => conversations.id, { onDelete: 'cascade' }), - messageId: text('message_id') - .references(() => messages.id, { onDelete: 'cascade' }), - status: text('status').notNull().default('running'), // running | needs_confirmation | needs_secret | completed | failed - pendingConfirmation: text('pending_confirmation'), // JSON when status=needs_confirmation - pendingSecret: text('pending_secret'), // JSON when status=needs_secret - inputTokens: integer('input_tokens').notNull().default(0), - outputTokens: integer('output_tokens').notNull().default(0), - estimatedCost: real('estimated_cost').notNull().default(0), - error: text('error'), - createdAt: integer('created_at').notNull(), - updatedAt: integer('updated_at').notNull(), -}); - export const memoryCheckpoints = sqliteTable('memory_checkpoints', { key: text('key').primaryKey(), value: text('value').notNull(), diff --git a/assistant/src/runtime/http-server.ts b/assistant/src/runtime/http-server.ts index e733c39fcef..1501c105668 100644 --- a/assistant/src/runtime/http-server.ts +++ b/assistant/src/runtime/http-server.ts @@ -110,15 +110,7 @@ import { handlePairingRequest, handlePairingStatus, } from './routes/pairing-routes.js'; -import { - handleAddTrustRule, - handleCreateRun, - handleGetRun, - handleRunDecision, - handleRunSecret, -} from './routes/run-routes.js'; import { handleAddSecret } from './routes/secret-routes.js'; -import type { RunOrchestrator } from './run-orchestrator.js'; // Re-export for consumers export { isPrivateAddress } from './middleware/auth.js'; @@ -159,7 +151,6 @@ export class RuntimeHttpServer { private bearerToken: string | undefined; private processMessage?: MessageProcessor; private persistAndProcessMessage?: NonBlockingMessageProcessor; - private runOrchestrator?: RunOrchestrator; private approvalCopyGenerator?: ApprovalCopyGenerator; private approvalConversationGenerator?: ApprovalConversationGenerator; private interfacesDir: string | null; @@ -177,7 +168,6 @@ export class RuntimeHttpServer { this.bearerToken = options.bearerToken; this.processMessage = options.processMessage; this.persistAndProcessMessage = options.persistAndProcessMessage; - this.runOrchestrator = options.runOrchestrator; this.approvalCopyGenerator = options.approvalCopyGenerator; this.approvalConversationGenerator = options.approvalConversationGenerator; this.interfacesDir = options.interfacesDir ?? null; @@ -603,25 +593,6 @@ export class RuntimeHttpServer { }); } - if (endpoint === 'runs' && req.method === 'POST') { - if (!this.runOrchestrator) return Response.json({ error: 'Run orchestration not configured' }, { status: 503 }); - return await handleCreateRun(req, this.runOrchestrator); - } - - const runsMatch = endpoint.match(/^runs\/([^/]+)(\/decision|\/trust-rule|\/secret)?$/); - if (runsMatch) { - if (!this.runOrchestrator) return Response.json({ error: 'Run orchestration not configured' }, { status: 503 }); - const runId = runsMatch[1]; - if (runsMatch[2] === '/decision' && req.method === 'POST') return await handleRunDecision(runId, req, this.runOrchestrator); - if (runsMatch[2] === '/secret' && req.method === 'POST') return await handleRunSecret(runId, req, this.runOrchestrator); - if (runsMatch[2] === '/trust-rule' && req.method === 'POST') { - const run = this.runOrchestrator.getRun(runId); - if (!run) return Response.json({ error: 'Run not found' }, { status: 404 }); - return await handleAddTrustRule(runId, req); - } - if (req.method === 'GET') return handleGetRun(runId, this.runOrchestrator); - } - const interfacesMatch = endpoint.match(/^interfaces\/(.+)$/); if (interfacesMatch && req.method === 'GET') return this.handleGetInterface(interfacesMatch[1]); diff --git a/assistant/src/runtime/http-types.ts b/assistant/src/runtime/http-types.ts index dea5247c5c0..dc6bdde72a1 100644 --- a/assistant/src/runtime/http-types.ts +++ b/assistant/src/runtime/http-types.ts @@ -6,8 +6,6 @@ import type { Session } from '../daemon/session.js'; import type { GuardianRuntimeContext } from '../daemon/session-runtime-assembly.js'; import type { ApprovalMessageContext, ComposeApprovalMessageGenerativeOptions } from './approval-message-composer.js'; import type { AssistantEventHub } from './assistant-event-hub.js'; -import type { RunOrchestrator } from './run-orchestrator.js'; - /** * Daemon-injected function that generates approval copy using a provider. * Returns generated text or `null` on failure (caller falls back to deterministic text). @@ -115,8 +113,6 @@ export interface RuntimeHttpServerOptions { processMessage?: MessageProcessor; /** Non-blocking processor for POST /messages (persists + fires agent loop). */ persistAndProcessMessage?: NonBlockingMessageProcessor; - /** Run orchestrator for the approval-flow run endpoints. */ - runOrchestrator?: RunOrchestrator; /** Root directory for interface files on disk. */ interfacesDir?: string; /** Daemon-injected generator for approval copy (provider-backed). */ diff --git a/assistant/src/runtime/routes/events-routes.ts b/assistant/src/runtime/routes/events-routes.ts index 902ed9a6f0b..ff35c98e9ca 100644 --- a/assistant/src/runtime/routes/events-routes.ts +++ b/assistant/src/runtime/routes/events-routes.ts @@ -49,8 +49,8 @@ export function handleSubscribeAssistantEvents( // closures are in place before events can arrive. `controllerRef` is set // synchronously inside ReadableStream's start(), so it is non-null by the // time any event or eviction fires. - // 'self' is the assistantId that RunOrchestrator assigns to all HTTP-run - // events (see buildAssistantEvent('self', ...) in run-orchestrator.ts). + // 'self' is the assistantId used by buildAssistantEvent('self', ...) for + // all HTTP and voice session events. let controllerRef: ReadableStreamDefaultController | null = null; let heartbeatTimer: ReturnType | null = null; let sub!: AssistantEventSubscription; diff --git a/assistant/src/runtime/routes/run-routes.ts b/assistant/src/runtime/routes/run-routes.ts deleted file mode 100644 index 1670e948798..00000000000 --- a/assistant/src/runtime/routes/run-routes.ts +++ /dev/null @@ -1,289 +0,0 @@ -/** - * Route handlers for run creation, status, decisions, and trust rules. - */ -import { CHANNEL_IDS, INTERFACE_IDS, parseChannelId, parseInterfaceId } from '../../channels/types.js'; -import * as attachmentsStore from '../../memory/attachments-store.js'; -import { getOrCreateConversation } from '../../memory/conversation-key-store.js'; -import * as runsStore from '../../memory/runs-store.js'; -import { addRule } from '../../permissions/trust-store.js'; -import { getTool } from '../../tools/registry.js'; -import { getLogger } from '../../util/logger.js'; -import type { RunOrchestrator } from '../run-orchestrator.js'; - -const log = getLogger('runtime-http'); - -export async function handleCreateRun( - req: Request, - runOrchestrator: RunOrchestrator, -): Promise { - const body = await req.json() as { - conversationKey?: string; - content?: string; - attachmentIds?: string[]; - sourceChannel?: string; - interface?: string; - }; - - const { conversationKey, content, attachmentIds } = body; - if (!body.sourceChannel || typeof body.sourceChannel !== 'string') { - return Response.json({ error: 'sourceChannel is required' }, { status: 400 }); - } - const sourceChannel = parseChannelId(body.sourceChannel); - - if (!sourceChannel) { - return Response.json( - { error: `Invalid sourceChannel: ${body.sourceChannel}. Valid values: ${CHANNEL_IDS.join(', ')}` }, - { status: 400 }, - ); - } - - if (!body.interface || typeof body.interface !== 'string') { - return Response.json({ error: 'interface is required' }, { status: 400 }); - } - const sourceInterface = parseInterfaceId(body.interface); - if (!sourceInterface) { - return Response.json( - { error: `Invalid interface: ${body.interface}. Valid values: ${INTERFACE_IDS.join(', ')}` }, - { status: 400 }, - ); - } - - if (!conversationKey) { - return Response.json({ error: 'conversationKey is required' }, { status: 400 }); - } - - if (content != null && typeof content !== 'string') { - return Response.json({ error: 'content must be a string' }, { status: 400 }); - } - - const trimmedContent = typeof content === 'string' ? content.trim() : ''; - const hasAttachments = Array.isArray(attachmentIds) && attachmentIds.length > 0; - - if (trimmedContent.length === 0 && !hasAttachments) { - return Response.json({ error: 'content or attachmentIds is required' }, { status: 400 }); - } - - if (hasAttachments) { - const resolved = attachmentsStore.getAttachmentsByIds(attachmentIds); - if (resolved.length !== attachmentIds.length) { - const resolvedIds = new Set(resolved.map((a) => a.id)); - const missing = attachmentIds.filter((id) => !resolvedIds.has(id)); - return Response.json( - { error: `Attachment IDs not found: ${missing.join(', ')}` }, - { status: 400 }, - ); - } - } - - const mapping = getOrCreateConversation(conversationKey); - - try { - const { run } = await runOrchestrator.startRun( - mapping.conversationId, - content ?? '', - hasAttachments ? attachmentIds : undefined, - { sourceChannel, sourceInterface }, - ); - return Response.json({ - id: run.id, - status: run.status, - messageId: run.messageId, - createdAt: new Date(run.createdAt).toISOString(), - }, { status: 201 }); - } catch (err) { - if (err instanceof Error && err.message === 'Session is already processing a message') { - return Response.json( - { error: 'Session is busy processing another message. Please retry.' }, - { status: 409 }, - ); - } - throw err; - } -} - -export function handleGetRun( - runId: string, - runOrchestrator: RunOrchestrator, -): Response { - const run = runOrchestrator.getRun(runId); - if (!run) { - return Response.json({ error: 'Run not found' }, { status: 404 }); - } - - return Response.json({ - id: run.id, - status: run.status, - messageId: run.messageId, - pendingConfirmation: run.pendingConfirmation, - pendingSecret: run.pendingSecret, - error: run.error, - createdAt: new Date(run.createdAt).toISOString(), - updatedAt: new Date(run.updatedAt).toISOString(), - }); -} - -export async function handleRunDecision( - runId: string, - req: Request, - runOrchestrator: RunOrchestrator, -): Promise { - const run = runOrchestrator.getRun(runId); - if (!run) { - return Response.json({ error: 'Run not found' }, { status: 404 }); - } - - const body = await req.json() as { decision?: string }; - const { decision } = body; - - if (decision !== 'allow' && decision !== 'deny') { - return Response.json( - { error: 'decision must be "allow" or "deny"' }, - { status: 400 }, - ); - } - - const result = runOrchestrator.submitDecision(runId, decision); - if (result === 'run_not_found') { - return Response.json( - { error: 'Run not found' }, - { status: 404 }, - ); - } - if (result === 'no_pending_decision') { - return Response.json( - { error: 'No confirmation pending for this run' }, - { status: 409 }, - ); - } - - return Response.json({ accepted: true }); -} - -/** - * Add a trust rule, but ONLY if there is a pending confirmation for the - * given run. The caller-supplied pattern and scope are validated against - * the server-generated allowlist/scope options that were sent with the - * original confirmation_request — preventing arbitrary rule injection. - */ -export async function handleAddTrustRule( - runId: string, - req: Request, -): Promise { - const run = runsStore.getRun(runId); - if (!run) { - return Response.json({ error: 'Run not found' }, { status: 404 }); - } - - if (run.status !== 'needs_confirmation' || !run.pendingConfirmation) { - return Response.json( - { error: 'No confirmation pending for this run' }, - { status: 409 }, - ); - } - - if (run.pendingConfirmation.persistentDecisionsAllowed === false) { - return Response.json( - { error: 'Persistent trust rules are not allowed for this tool invocation' }, - { status: 403 }, - ); - } - - const body = await req.json() as { - pattern?: string; - scope?: string; - decision?: string; - }; - - const { pattern, scope, decision } = body; - - if (!pattern || typeof pattern !== 'string') { - return Response.json({ error: 'pattern is required' }, { status: 400 }); - } - if (!scope || typeof scope !== 'string') { - return Response.json({ error: 'scope is required' }, { status: 400 }); - } - if (decision !== 'allow' && decision !== 'deny') { - return Response.json({ error: 'decision must be "allow" or "deny"' }, { status: 400 }); - } - - const confirmation = run.pendingConfirmation; - - // Validate pattern against server-provided allowlist options - const validPatterns = (confirmation.allowlistOptions ?? []).map((o) => o.pattern); - if (!validPatterns.includes(pattern)) { - return Response.json( - { error: 'pattern does not match any server-provided allowlist option' }, - { status: 403 }, - ); - } - - // Validate scope against server-provided scope options - const validScopes = (confirmation.scopeOptions ?? []).map((o) => o.scope); - if (!validScopes.includes(scope)) { - return Response.json( - { error: 'scope does not match any server-provided scope option' }, - { status: 403 }, - ); - } - - try { - // Only persist executionTarget for skill-origin tools — core tools don't - // set it in their PolicyContext, so a persisted value would prevent the - // rule from ever matching on subsequent permission checks. - const tool = getTool(confirmation.toolName); - const executionTarget = tool?.origin === 'skill' ? confirmation.executionTarget : undefined; - addRule(confirmation.toolName, pattern, scope, decision, undefined, { - executionTarget, - }); - log.info( - { tool: confirmation.toolName, pattern, scope, decision, runId }, - 'Trust rule added via HTTP (bound to pending confirmation)', - ); - return Response.json({ accepted: true }); - } catch (err) { - log.error({ err }, 'Failed to add trust rule'); - return Response.json({ error: 'Failed to add trust rule' }, { status: 500 }); - } -} - -export async function handleRunSecret( - runId: string, - req: Request, - runOrchestrator: RunOrchestrator, -): Promise { - const run = runOrchestrator.getRun(runId); - if (!run) { - return Response.json({ error: 'Run not found' }, { status: 404 }); - } - - const body = await req.json() as { - value?: string; - delivery?: string; - }; - - const { value, delivery } = body; - - if (delivery !== undefined && delivery !== 'store' && delivery !== 'transient_send') { - return Response.json( - { error: 'delivery must be "store" or "transient_send"' }, - { status: 400 }, - ); - } - - const result = runOrchestrator.submitSecret( - runId, - value, - delivery as 'store' | 'transient_send' | undefined, - ); - if (result === 'run_not_found') { - return Response.json({ error: 'Run not found' }, { status: 404 }); - } - if (result === 'no_pending_secret') { - return Response.json( - { error: 'No secret pending for this run' }, - { status: 409 }, - ); - } - - return Response.json({ accepted: true }); -} diff --git a/assistant/src/runtime/run-orchestrator.ts b/assistant/src/runtime/run-orchestrator.ts deleted file mode 100644 index 2ff9f270314..00000000000 --- a/assistant/src/runtime/run-orchestrator.ts +++ /dev/null @@ -1,580 +0,0 @@ -/** - * Orchestrates run lifecycle for the HTTP runtime API. - * - * A "run" wraps a single agent-loop execution, tracking its state through: - * running → needs_confirmation → running → completed | failed - * running → needs_secret → running → completed | failed - * - * When a tool needs permission, the orchestrator intercepts the - * confirmation_request from the session's prompter and records it in - * the run store. Similarly, when a tool needs a secret (e.g. - * credential_store prompt), the orchestrator intercepts the - * secret_request and records it. The client can then poll the run - * status and submit a decision or secret via the respective endpoints. - */ - -import type { ChannelId, InterfaceId, TurnChannelContext } from '../channels/types.js'; -import { parseChannelId, parseInterfaceId } from '../channels/types.js'; -import type { ServerMessage } from '../daemon/ipc-protocol.js'; -import type { Session } from '../daemon/session.js'; -import type { GuardianRuntimeContext } from '../daemon/session-runtime-assembly.js'; -import { resolveChannelCapabilities } from '../daemon/session-runtime-assembly.js'; -import type { Run } from '../memory/runs-store.js'; -import * as runsStore from '../memory/runs-store.js'; -import type { UserDecision } from '../permissions/types.js'; -import { checkIngressForSecrets } from '../security/secret-ingress.js'; -import { IngressBlockedError } from '../util/errors.js'; -import { getLogger } from '../util/logger.js'; -import { buildAssistantEvent } from './assistant-event.js'; -import { assistantEventHub } from './assistant-event-hub.js'; - -const log = getLogger('run-orchestrator'); - -// --------------------------------------------------------------------------- -// Types -// --------------------------------------------------------------------------- - -/** - * Real-time event sink for voice TTS streaming. When provided to startRun(), - * agent-loop events are forwarded here alongside the existing assistantEventHub - * publication. This enables voice relay to receive streaming text deltas for - * real-time text-to-speech without modifying the standard channel path. - */ -export interface VoiceRunEventSink { - onTextDelta(text: string): void; - onMessageComplete(): void; - onError(message: string): void; - onToolUse(toolName: string, input: Record): void; -} - -/** - * Handle returned by startRun() that allows callers to abort an in-flight - * run. Used by voice barge-in to cancel the current turn without crashing - * session state. - */ -export interface RunHandle { - run: Run; - abort: () => void; -} - -interface PendingRunState { - prompterRequestId: string; - session: Session; -} - -export interface RunOrchestratorDeps { - getOrCreateSession: (conversationId: string, transport?: { - channelId: ChannelId; - hints?: string[]; - uxBrief?: string; - }) => Promise; - resolveAttachments: (attachmentIds: string[]) => Array<{ - id: string; - filename: string; - mimeType: string; - data: string; - }>; - /** - * Re-derive the default `strictSideEffects` value for a conversation - * from its thread type (e.g. private → true, standard → false). - * Called when `forceStrictSideEffects` is not explicitly provided so - * the session never retains a stale override from a prior run. - */ - deriveDefaultStrictSideEffects: (conversationId: string) => boolean; -} - -export interface RunStartOptions { - /** - * When true, forces `strictSideEffects` on the session's memory policy - * so that all side-effect tools trigger the approval/confirmation flow, - * even if existing allow rules would normally auto-approve them. - * Used for non-guardian actors in guardian-gated channels. - */ - forceStrictSideEffects?: boolean; - /** - * The originating channel (e.g. 'telegram', 'slack'). When provided, - * channel capabilities are resolved for this channel instead of the - * default 'vellum'. - */ - sourceChannel?: ChannelId; - /** - * The originating interface (e.g. 'macos', 'ios', 'telegram'). When - * provided, the session's turn interface context is set to this value - * so interface-aware metadata flows through the agent loop. - */ - sourceInterface?: InterfaceId; - /** - * Transport hints from sourceMetadata (e.g. reply-context cues). - * Forwarded to the session so the agent loop can incorporate them. - */ - hints?: string[]; - /** - * Brief UX context from sourceMetadata (e.g. UI surface description). - * Forwarded to the session so the agent loop can tailor its response. - */ - uxBrief?: string; - /** Assistant scope for multi-assistant channels. */ - assistantId?: string; - /** Guardian trust/identity context for the inbound actor. */ - guardianContext?: GuardianRuntimeContext; - /** Channel command intent metadata (e.g. Telegram /start). */ - commandIntent?: { type: string; payload?: string; languageCode?: string }; - /** Resolved channel context for this turn. */ - turnChannelContext?: TurnChannelContext; - /** - * When provided, agent-loop events are forwarded to this sink in real time. - * Used by voice relay for streaming TTS token delivery. - */ - eventSink?: VoiceRunEventSink; - /** - * When true, any confirmation_request from the prompter is immediately - * auto-denied instead of being stored for client polling. Used by the - * voice path when forceStrictSideEffects is active: the voice transport - * has no interactive approval UI, so without this flag the run would - * stall for the full permission timeout (300s by default). - */ - voiceAutoDenyConfirmations?: boolean; - /** - * When true, confirmation_request events are auto-approved immediately. - * Used for verified-guardian voice turns where there is no interactive - * approval UI but parity with guardian chat permissions is required. - */ - voiceAutoAllowConfirmations?: boolean; - /** - * When true, secret_request events are resolved immediately with a null - * value so voice turns do not stall waiting for a secret-entry UI that - * voice does not provide. - */ - voiceAutoResolveSecrets?: boolean; - /** - * Call-control protocol prompt injected into each voice turn so the - * model knows to emit control markers ([ASK_GUARDIAN:], [END_CALL], etc.). - */ - voiceCallControlPrompt?: string; -} - -// --------------------------------------------------------------------------- -// Orchestrator -// --------------------------------------------------------------------------- - -export class RunOrchestrator { - private pending = new Map(); - private deps: RunOrchestratorDeps; - - constructor(deps: RunOrchestratorDeps) { - this.deps = deps; - - // On startup, mark any runs left in non-terminal states as failed. - // These are orphans from a previous daemon process that was interrupted. - const recovered = runsStore.failOrphanedRuns(); - if (recovered > 0) { - log.info({ count: recovered }, 'Recovered orphaned runs from previous session'); - } - } - - /** - * Start a new run: persist the user message, create a run record, - * and fire the agent loop in the background. - * - * Returns a RunHandle containing the Run record and an abort() function - * that can cancel the in-flight agent loop (e.g. for voice barge-in). - */ - async startRun( - conversationId: string, - content: string, - attachmentIds?: string[], - options?: RunStartOptions, - signal?: AbortSignal, - ): Promise { - // Block inbound content that contains secrets — mirrors the IPC check in sessions.ts - const ingressCheck = checkIngressForSecrets(content); - if (ingressCheck.blocked) { - throw new IngressBlockedError(ingressCheck.userNotice!, ingressCheck.detectedTypes); - } - - // Build transport metadata when channel context is available so the - // session receives the same hints/uxBrief as the non-orchestrator path. - const transport = options?.sourceChannel - ? { - channelId: options.sourceChannel, - hints: options.hints, - uxBrief: options.uxBrief, - } - : undefined; - - const session = await this.deps.getOrCreateSession(conversationId, transport); - - if (session.isProcessing()) { - // Voice barge-in can race with turn teardown. Wait briefly for the - // previous run to finish aborting before giving up. - // The caller can pass an AbortSignal so superseded turns bail out - // of this wait early instead of occupying the session. - const maxWaitMs = 3000; - const pollIntervalMs = 50; - let waited = 0; - while (session.isProcessing() && waited < maxWaitMs) { - if (signal?.aborted) { - throw new Error('Run aborted while waiting for session'); - } - await new Promise(resolve => setTimeout(resolve, pollIntervalMs)); - waited += pollIntervalMs; - } - if (signal?.aborted) { - throw new Error('Run aborted while waiting for session'); - } - if (session.isProcessing()) { - throw new Error('Session is already processing a message'); - } - } - - // Determine the correct strictSideEffects value for this run: - // - explicit true/false from the caller → use that value - // - undefined → re-derive from the conversation's thread type so a - // prior run's forceStrictSideEffects=true doesn't stick on the - // cached session (private threads → true, standard → false) - const strictSideEffects = options?.forceStrictSideEffects - ?? this.deps.deriveDefaultStrictSideEffects(conversationId); - session.memoryPolicy = { - ...session.memoryPolicy, - strictSideEffects, - }; - session.setAssistantId(options?.assistantId ?? 'self'); - session.setGuardianContext(options?.guardianContext ?? null); - session.setCommandIntent(options?.commandIntent ?? null); - session.setTurnChannelContext(options?.turnChannelContext ?? { - userMessageChannel: parseChannelId(options?.sourceChannel) ?? 'vellum', - assistantMessageChannel: parseChannelId(options?.sourceChannel) ?? 'vellum', - }); - const resolvedInterface = parseInterfaceId(options?.sourceInterface) ?? ('vellum' as InterfaceId); - session.setTurnInterfaceContext({ - userMessageInterface: resolvedInterface, - assistantMessageInterface: resolvedInterface, - }); - - const attachments = attachmentIds - ? this.deps.resolveAttachments(attachmentIds) - : []; - - const requestId = crypto.randomUUID(); - const messageId = session.persistUserMessage(content, attachments, requestId); - const run = runsStore.createRun(conversationId, messageId); - - // Set channel capabilities based on the originating channel so capabilities - // (e.g. attachment scope) match the actual transport rather than always - // defaulting to 'vellum'. - session.setChannelCapabilities(resolveChannelCapabilities(options?.sourceChannel ?? 'vellum', resolvedInterface)); - session.setVoiceCallControlPrompt(options?.voiceCallControlPrompt ?? null); - - // Serialized publish chain so hub subscribers observe events in order. - let hubChain: Promise = Promise.resolve(); - const publishToHub = (msg: ServerMessage): void => { - const msgRecord = msg as unknown as Record; - const msgSessionId = - 'sessionId' in msg && typeof msgRecord.sessionId === 'string' - ? (msgRecord.sessionId as string) - : undefined; - const resolvedSessionId = msgSessionId ?? conversationId; - const event = buildAssistantEvent('self', msg, resolvedSessionId); - hubChain = (async () => { - await hubChain; - try { - await assistantEventHub.publish(event); - } catch (err) { - log.warn({ err }, 'assistant-events hub subscriber threw during HTTP run'); - } - })(); - }; - - - // Hook into session to intercept confirmation_request and secret_request events. - // When the prompter sends one of these, we record it in the run store so - // the client can poll and submit a decision/secret via the respective endpoint. - // Do NOT set hasNoClient — run sessions have a client (the HTTP caller). - const autoDeny = options?.voiceAutoDenyConfirmations === true; - const autoAllow = !autoDeny && options?.voiceAutoAllowConfirmations === true; - const autoResolveSecrets = options?.voiceAutoResolveSecrets === true; - let lastError: string | null = null; - session.updateClient((msg: ServerMessage) => { - if (msg.type === 'confirmation_request') { - if (autoDeny) { - // Voice path with strict side effects: immediately deny the - // confirmation request so the agent loop resumes without - // waiting for the full permission timeout (300s). The voice - // transport has no interactive approval UI, so polling would - // just stall. Security is preserved — the tool call is denied. - log.info( - { runId: run.id, toolName: msg.toolName }, - 'Auto-denying confirmation request for voice turn (forceStrictSideEffects)', - ); - session.handleConfirmationResponse( - msg.requestId, - 'deny', - undefined, - undefined, - `Permission denied for "${msg.toolName}": this voice call does not have interactive approval capabilities. Side-effect tools are not available for non-guardian voice callers. In your next assistant reply, explain briefly that this action requires guardian-level access and cannot be performed during this call.`, - ); - // Still publish to hub for observability, but skip run-store - // bookkeeping since the confirmation is already resolved. - publishToHub(msg); - return; - } - if (autoAllow) { - // Verified guardian voice turn: auto-approve so voice has the same - // permission capabilities as guardian chat despite lacking an - // interactive confirmation UI. - log.info( - { runId: run.id, toolName: msg.toolName }, - 'Auto-approving confirmation request for guardian voice turn', - ); - session.handleConfirmationResponse( - msg.requestId, - 'allow', - undefined, - undefined, - `Permission approved for "${msg.toolName}": this is a verified guardian voice call.`, - ); - // Publish for observability, but skip run-store pending state since - // the request is already resolved. - publishToHub(msg); - return; - } - - runsStore.setRunConfirmation(run.id, { - toolName: msg.toolName, - toolUseId: msg.requestId, - input: msg.input, - riskLevel: msg.riskLevel, - executionTarget: msg.executionTarget, - allowlistOptions: msg.allowlistOptions, - scopeOptions: msg.scopeOptions, - persistentDecisionsAllowed: msg.persistentDecisionsAllowed, - }); - this.pending.set(run.id, { - prompterRequestId: msg.requestId, - session, - }); - } else if (msg.type === 'secret_request') { - if (autoResolveSecrets) { - // Voice has no secret-entry UI, so resolve immediately to avoid - // waiting for the full secret prompt timeout. - log.info( - { runId: run.id, service: msg.service, field: msg.field }, - 'Auto-resolving secret request for voice turn (no secret-entry UI)', - ); - session.handleSecretResponse(msg.requestId, undefined, 'store'); - publishToHub(msg); - return; - } - - runsStore.setRunSecret(run.id, { - requestId: msg.requestId, - service: msg.service, - field: msg.field, - label: msg.label, - description: msg.description, - placeholder: msg.placeholder, - purpose: msg.purpose, - allowOneTimeSend: msg.allowOneTimeSend, - }); - this.pending.set(run.id, { - prompterRequestId: msg.requestId, - session, - }); - } - // Mirror every outbound message to the assistant-events hub so SSE - // subscribers receive the same payload parity as IPC clients. - publishToHub(msg); - }); - - // Fire-and-forget the agent loop - const cleanup = () => { - this.pending.delete(run.id); - // Reset channel capabilities so a subsequent IPC/desktop session on the - // same conversation is not incorrectly treated as an HTTP-API client. - session.setChannelCapabilities(null); - session.setGuardianContext(null); - session.setCommandIntent(null); - session.setAssistantId('self'); - session.setVoiceCallControlPrompt(null); - // Reset turn interface context so a subsequent IPC/desktop session on the - // same conversation is not incorrectly treated as an HTTP-API interface. - session.setTurnInterfaceContext({ - userMessageInterface: 'vellum' as InterfaceId, - assistantMessageInterface: 'vellum' as InterfaceId, - }); - // Reset the session's client callback to a no-op so the stale - // closure doesn't intercept events from future runs on the same session. - // Set hasNoClient=true here since the run is done and no HTTP caller - // is actively listening — truly no client at this point. - session.updateClient(() => {}, true); - }; - - const eventSink = options?.eventSink; - - void (async () => { - try { - await session.runAgentLoop(content, messageId, (msg: ServerMessage) => { - if (msg.type === 'error') { - lastError = msg.message; - } else if (msg.type === 'session_error') { - lastError = msg.userMessage; - } - // Mirror agent-loop events (assistant_text_delta, message_complete, - // tool_use_start, tool_result, etc.) to the hub. These travel through - // the onEvent path, distinct from the updateClient path used by the - // prompter (confirmation_request). Both paths must publish so SSE - // consumers receive the full response stream. - publishToHub(msg); - - // Forward voice-relevant events to the real-time event sink when - // provided. This runs in addition to (not instead of) the hub - // publication above so both paths remain active. - if (eventSink) { - if (msg.type === 'assistant_text_delta') { - eventSink.onTextDelta(msg.text); - } else if (msg.type === 'message_complete') { - eventSink.onMessageComplete(); - } else if (msg.type === 'generation_cancelled') { - // Treat cancellation as a completed turn so the voice - // turnComplete promise settles instead of hanging forever. - eventSink.onMessageComplete(); - } else if (msg.type === 'error') { - eventSink.onError(msg.message); - } else if (msg.type === 'session_error') { - eventSink.onError(msg.userMessage); - } else if (msg.type === 'tool_use_start') { - eventSink.onToolUse(msg.toolName, msg.input); - } - } - }); - if (lastError) { - log.error({ runId: run.id, error: lastError }, 'Run failed (error event from agent loop)'); - runsStore.failRun(run.id, lastError); - } else { - runsStore.completeRun(run.id); - } - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - log.error({ err, runId: run.id }, 'Run failed'); - runsStore.failRun(run.id, message); - // Notify the voice event sink so the caller's turnComplete - // promise settles instead of hanging on unhandled exceptions. - if (eventSink) { - eventSink.onError(message); - } - } finally { - cleanup(); - } - })(); - - return { - run, - // Scope the abort to this specific run by capturing the requestId. - // If the session has moved on to a new turn (different currentRequestId), - // this abort is stale and becomes a no-op — preventing voice barge-in - // from cancelling unrelated turns. - abort: () => { - if (session.currentRequestId === requestId) { - session.abort(); - } - }, - }; - } - - /** Read current run state from the store. */ - getRun(runId: string): Run | null { - return runsStore.getRun(runId); - } - - /** - * Submit a permission decision for a pending confirmation. - * - * Returns: - * - `'applied'` – decision was applied or already handled (idempotent) - * - `'run_not_found'` – no run exists with the given ID - * - `'no_pending_decision'` – run exists but is not awaiting a confirmation - */ - submitDecision( - runId: string, - decision: UserDecision, - decisionContext?: string, - ): 'applied' | 'run_not_found' | 'no_pending_decision' { - const pendingState = this.pending.get(runId); - if (pendingState) { - runsStore.clearRunConfirmation(runId); - pendingState.session.handleConfirmationResponse( - pendingState.prompterRequestId, - decision, - undefined, - undefined, - decisionContext, - ); - this.pending.delete(runId); - return 'applied'; - } - - // No in-memory pending state — check if the run exists. - const run = runsStore.getRun(runId); - if (!run) return 'run_not_found'; - - // If the run is still needs_confirmation but there's no in-memory - // state, the prompter already timed out and auto-denied. Fail the - // run rather than clearing to 'running', since no agent loop exists - // to complete it. - if (run.status === 'needs_confirmation') { - runsStore.failRun(runId, 'Prompter timed out (no active handler)'); - return 'applied'; - } - - // Terminal states (completed/failed) mean the decision was already - // handled (double-submit). Treat as idempotent success. - if (run.status === 'completed' || run.status === 'failed') { - return 'applied'; - } - - // Run is in 'running' state with no pending confirmation — the - // agent loop hasn't reached a confirmation point yet. Reject so - // the client doesn't mistakenly treat the decision as accepted. - return 'no_pending_decision'; - } - - /** - * Submit a secret value for a pending secret request. - * - * Returns: - * - `'applied'` – secret was forwarded to the session - * - `'run_not_found'` – no run exists with the given ID - * - `'no_pending_secret'` – run exists but is not awaiting a secret - */ - submitSecret( - runId: string, - value?: string, - delivery?: 'store' | 'transient_send', - ): 'applied' | 'run_not_found' | 'no_pending_secret' { - const pendingState = this.pending.get(runId); - if (pendingState) { - runsStore.clearRunSecret(runId); - pendingState.session.handleSecretResponse( - pendingState.prompterRequestId, - value, - delivery, - ); - this.pending.delete(runId); - return 'applied'; - } - - const run = runsStore.getRun(runId); - if (!run) return 'run_not_found'; - - if (run.status === 'needs_secret') { - runsStore.failRun(runId, 'Secret prompter timed out (no active handler)'); - return 'applied'; - } - - if (run.status === 'completed' || run.status === 'failed') { - return 'applied'; - } - - return 'no_pending_secret'; - } -}