Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
311 changes: 308 additions & 3 deletions assistant/src/__tests__/run-orchestrator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import { initializeDb, getDb, resetDb } from '../memory/db.js';
import { createConversation } from '../memory/conversation-store.js';
import { createRun, getRun, setRunConfirmation } from '../memory/runs-store.js';
import { RunOrchestrator } from '../runtime/run-orchestrator.js';
import type { VoiceRunEventSink } from '../runtime/run-orchestrator.js';
import type { ChannelCapabilities } from '../daemon/session-runtime-assembly.js';

initializeDb();
Expand Down Expand Up @@ -110,7 +111,7 @@ describe('run failure detection', () => {
deriveDefaultStrictSideEffects: () => false,
});

const run = await orchestrator.startRun(conversation.id, 'Hello');
const { run } = await orchestrator.startRun(conversation.id, 'Hello');

// The agent loop fires asynchronously; give it a tick to settle.
await new Promise((r) => setTimeout(r, 50));
Expand All @@ -133,7 +134,7 @@ describe('run failure detection', () => {
deriveDefaultStrictSideEffects: () => false,
});

const run = await orchestrator.startRun(conversation.id, 'Hello');
const { run } = await orchestrator.startRun(conversation.id, 'Hello');

await new Promise((r) => setTimeout(r, 50));

Expand Down Expand Up @@ -212,7 +213,7 @@ describe('run approval state executionTarget', () => {
deriveDefaultStrictSideEffects: () => false,
});

const run = await orchestrator.startRun(conversation.id, 'Run host command');
const { run } = await orchestrator.startRun(conversation.id, 'Run host command');
const stored = orchestrator.getRun(run.id);
expect(stored?.status).toBe('needs_confirmation');
expect(stored?.pendingConfirmation?.executionTarget).toBe('host');
Expand Down Expand Up @@ -461,3 +462,307 @@ describe('strictSideEffects re-derivation across runs', () => {
expect((session as unknown as { memoryPolicy: { strictSideEffects: boolean } }).memoryPolicy.strictSideEffects).toBe(false);
});
});

// ═══════════════════════════════════════════════════════════════════════════
// VoiceRunEventSink forwarding
// ═══════════════════════════════════════════════════════════════════════════

describe('eventSink forwarding', () => {
beforeEach(() => {
const db = getDb();
db.run('DELETE FROM message_runs');
db.run('DELETE FROM messages');
db.run('DELETE FROM conversations');
});

test('eventSink receives assistant_text_delta events', async () => {
const conversation = createConversation('event sink delta test');
const deltaMsg: ServerMessage = {
type: 'assistant_text_delta',
text: 'Hello from agent',
sessionId: conversation.id,
};
const session = makeSessionWithEvent(deltaMsg);

const receivedDeltas: string[] = [];
const sink: VoiceRunEventSink = {
onTextDelta: (text) => receivedDeltas.push(text),
onMessageComplete: () => {},
onError: () => {},
onToolUse: () => {},
};

const orchestrator = new RunOrchestrator({
getOrCreateSession: async () => session,
resolveAttachments: () => [],
deriveDefaultStrictSideEffects: () => false,
});

await orchestrator.startRun(conversation.id, 'Hello', undefined, {
eventSink: sink,
});
await new Promise((r) => setTimeout(r, 50));

expect(receivedDeltas).toEqual(['Hello from agent']);
});

test('eventSink receives error events', async () => {
const conversation = createConversation('event sink error test');
const errMsg: ServerMessage = {
type: 'error',
message: 'Something broke',
};
const session = makeSessionWithEvent(errMsg);

const receivedErrors: string[] = [];
const sink: VoiceRunEventSink = {
onTextDelta: () => {},
onMessageComplete: () => {},
onError: (msg) => receivedErrors.push(msg),
onToolUse: () => {},
};

const orchestrator = new RunOrchestrator({
getOrCreateSession: async () => session,
resolveAttachments: () => [],
deriveDefaultStrictSideEffects: () => false,
});

await orchestrator.startRun(conversation.id, 'Hello', undefined, {
eventSink: sink,
});
await new Promise((r) => setTimeout(r, 50));

expect(receivedErrors).toEqual(['Something broke']);
});

test('eventSink receives tool_use_start events', async () => {
const conversation = createConversation('event sink tool test');
const toolMsg: ServerMessage = {
type: 'tool_use_start',
toolName: 'web_search',
input: { query: 'test' },
sessionId: conversation.id,
};
const session = makeSessionWithEvent(toolMsg);

const receivedTools: Array<{ name: string; input: Record<string, unknown> }> = [];
const sink: VoiceRunEventSink = {
onTextDelta: () => {},
onMessageComplete: () => {},
onError: () => {},
onToolUse: (name, input) => receivedTools.push({ name, input }),
};

const orchestrator = new RunOrchestrator({
getOrCreateSession: async () => session,
resolveAttachments: () => [],
deriveDefaultStrictSideEffects: () => false,
});

await orchestrator.startRun(conversation.id, 'Hello', undefined, {
eventSink: sink,
});
await new Promise((r) => setTimeout(r, 50));

expect(receivedTools).toHaveLength(1);
expect(receivedTools[0].name).toBe('web_search');
expect(receivedTools[0].input).toEqual({ query: 'test' });
});

test('no events forwarded when eventSink is not provided', async () => {
const conversation = createConversation('no sink test');
const deltaMsg: ServerMessage = {
type: 'assistant_text_delta',
text: 'Hello',
sessionId: conversation.id,
};
const session = makeSessionWithEvent(deltaMsg);

const orchestrator = new RunOrchestrator({
getOrCreateSession: async () => session,
resolveAttachments: () => [],
deriveDefaultStrictSideEffects: () => false,
});

// Should not throw when no eventSink is provided
const { run } = await orchestrator.startRun(conversation.id, 'Hello');
await new Promise((r) => setTimeout(r, 50));

const stored = orchestrator.getRun(run.id);
expect(stored?.status).toBe('completed');
});
});

// ═══════════════════════════════════════════════════════════════════════════
// Run abort / cancellation
// ═══════════════════════════════════════════════════════════════════════════

describe('run abort', () => {
beforeEach(() => {
const db = getDb();
db.run('DELETE FROM message_runs');
db.run('DELETE FROM messages');
db.run('DELETE FROM conversations');
});

test('startRun returns an abort function', async () => {
const conversation = createConversation('abort handle test');
const session = {
isProcessing: () => false,
currentRequestId: undefined as string | undefined,
persistUserMessage: (_c: string, _a: unknown[], reqId: string) => {
session.currentRequestId = reqId;
return undefined as unknown as string;
},
memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false },
setChannelCapabilities: () => {},
setAssistantId: () => {},
setGuardianContext: () => {},
setCommandIntent: () => {},
setTurnChannelContext: () => {},
updateClient: () => {},
runAgentLoop: async () => {},
handleConfirmationResponse: () => {},
abort: () => {},
} as unknown as Session;

const orchestrator = new RunOrchestrator({
getOrCreateSession: async () => session,
resolveAttachments: () => [],
deriveDefaultStrictSideEffects: () => false,
});

const handle = await orchestrator.startRun(conversation.id, 'Hello');
expect(typeof handle.abort).toBe('function');
expect(handle.run.id).toBeDefined();
});

test('aborting a run does not crash session state', async () => {
const conversation = createConversation('abort safety test');
let abortCalled = false;

const session = {
isProcessing: () => false,
currentRequestId: undefined as string | undefined,
persistUserMessage: (_c: string, _a: unknown[], reqId: string) => {
session.currentRequestId = reqId;
return undefined as unknown as string;
},
memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false },
setChannelCapabilities: () => {},
setAssistantId: () => {},
setGuardianContext: () => {},
setCommandIntent: () => {},
setTurnChannelContext: () => {},
updateClient: () => {},
runAgentLoop: async () => {
// Simulate a long-running agent loop
await new Promise((r) => setTimeout(r, 200));
},
handleConfirmationResponse: () => {},
abort: () => { abortCalled = true; },
} as unknown as Session;

const orchestrator = new RunOrchestrator({
getOrCreateSession: async () => session,
resolveAttachments: () => [],
deriveDefaultStrictSideEffects: () => false,
});

const handle = await orchestrator.startRun(conversation.id, 'Hello');

// Abort immediately — session still has same requestId
handle.abort();
expect(abortCalled).toBe(true);

// Wait for cleanup to settle
await new Promise((r) => setTimeout(r, 300));

// Session state should not be corrupted — the run completes normally
// since the mock runAgentLoop resolves after 200ms regardless.
const stored = orchestrator.getRun(handle.run.id);
expect(stored).not.toBeNull();
});

test('stale abort handle is a no-op when session has moved to a new run', async () => {
const conversation = createConversation('stale abort test');
let abortCalled = false;

const session = {
isProcessing: () => false,
currentRequestId: undefined as string | undefined,
persistUserMessage: (_c: string, _a: unknown[], reqId: string) => {
session.currentRequestId = reqId;
return undefined as unknown as string;
},
memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false },
setChannelCapabilities: () => {},
setAssistantId: () => {},
setGuardianContext: () => {},
setCommandIntent: () => {},
setTurnChannelContext: () => {},
updateClient: () => {},
runAgentLoop: async () => {},
handleConfirmationResponse: () => {},
abort: () => { abortCalled = true; },
} as unknown as Session;

const orchestrator = new RunOrchestrator({
getOrCreateSession: async () => session,
resolveAttachments: () => [],
deriveDefaultStrictSideEffects: () => false,
});

// Start first run and capture its handle
const handle1 = await orchestrator.startRun(conversation.id, 'First turn');
await new Promise((r) => setTimeout(r, 50));

// Start second run — session's currentRequestId now belongs to run 2
const _handle2 = await orchestrator.startRun(conversation.id, 'Second turn');

// Attempt to abort using the stale handle from run 1.
// Since the session has moved to a new requestId, this should be a no-op.
handle1.abort();
expect(abortCalled).toBe(false);
});

test('abort works when session still has matching requestId', async () => {
const conversation = createConversation('matching abort test');
let abortCalled = false;

const session = {
isProcessing: () => false,
currentRequestId: undefined as string | undefined,
persistUserMessage: (_c: string, _a: unknown[], reqId: string) => {
session.currentRequestId = reqId;
return undefined as unknown as string;
},
memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false },
setChannelCapabilities: () => {},
setAssistantId: () => {},
setGuardianContext: () => {},
setCommandIntent: () => {},
setTurnChannelContext: () => {},
updateClient: () => {},
runAgentLoop: async () => {
// Keep the agent loop running so the session stays on this requestId
await new Promise((r) => setTimeout(r, 500));
},
handleConfirmationResponse: () => {},
abort: () => { abortCalled = true; },
} as unknown as Session;

const orchestrator = new RunOrchestrator({
getOrCreateSession: async () => session,
resolveAttachments: () => [],
deriveDefaultStrictSideEffects: () => false,
});

const handle = await orchestrator.startRun(conversation.id, 'Hello');

// Abort while the session is still processing this run
handle.abort();
expect(abortCalled).toBe(true);
});
});
8 changes: 4 additions & 4 deletions assistant/src/__tests__/runtime-runs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ describe('runtime runs — swarm lifecycle', () => {
deriveDefaultStrictSideEffects: () => false,
});

const run = await orchestrator.startRun(conversation.id, 'Build a feature');
const { run } = await orchestrator.startRun(conversation.id, 'Build a feature');
expect(run.status).toBe('running');

// Wait for agent loop to complete
Expand All @@ -181,7 +181,7 @@ describe('runtime runs — swarm lifecycle', () => {
deriveDefaultStrictSideEffects: () => false,
});

const run = await orchestrator.startRun(conversation.id, 'Run swarm');
const { run } = await orchestrator.startRun(conversation.id, 'Run swarm');

await new Promise((r) => setTimeout(r, 50));

Expand All @@ -198,7 +198,7 @@ describe('runtime runs — swarm lifecycle', () => {
deriveDefaultStrictSideEffects: () => false,
});

const run = await orchestrator.startRun(conversation.id, 'Delegate a swarm task');
const { run } = await orchestrator.startRun(conversation.id, 'Delegate a swarm task');

// Give agent loop time to emit confirmation_request
await new Promise((r) => setTimeout(r, 50));
Expand All @@ -216,7 +216,7 @@ describe('runtime runs — swarm lifecycle', () => {
deriveDefaultStrictSideEffects: () => false,
});

const run = await orchestrator.startRun(conversation.id, 'Run with approval');
const { run } = await orchestrator.startRun(conversation.id, 'Run with approval');
await new Promise((r) => setTimeout(r, 50));

// Verify pending state
Expand Down
Loading