Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions assistant/src/__tests__/session-queue.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { describe, expect, mock, test, beforeEach } from 'bun:test';
import { rmSync, writeFileSync } from 'node:fs';
import type { Message, ProviderResponse } from '../providers/types.js';
import type { AgentEvent, CheckpointInfo, CheckpointDecision } from '../agent/loop.js';
import type { ServerMessage } from '../daemon/ipc-protocol.js';
Expand Down Expand Up @@ -34,6 +35,7 @@ mock.module('../config/loader.js', () => ({
maxSummaryTokens: 512,
},
rateLimit: { maxRequestsPerMinute: 0, maxTokensPerSession: 0 },
timeouts: { permissionTimeoutSec: 1 },
apiKeys: {},
skills: { entries: {}, allowBundled: true },
memory: { retrieval: { injectionStrategy: 'inline' } },
Expand Down Expand Up @@ -65,6 +67,8 @@ mock.module('../skills/slash-commands.js', () => ({
}));

mock.module('../permissions/trust-store.js', () => ({
addRule: () => {},
findHighestPriorityRule: () => null,
clearCache: () => {},
}));

Expand All @@ -91,6 +95,11 @@ mock.module('../memory/conversation-store.js', () => ({
updateConversationTitle: () => {},
}));

mock.module('../memory/attachments-store.js', () => ({
uploadAttachment: () => ({ id: `att-${Date.now()}` }),
linkAttachmentToMessage: () => {},
}));

mock.module('../memory/retriever.js', () => ({
buildMemoryRecall: async () => ({
enabled: false,
Expand Down Expand Up @@ -208,6 +217,16 @@ async function waitForPendingRun(count: number, timeoutMs = 2000): Promise<void>
}
}

async function waitForCondition(predicate: () => boolean, timeoutMs = 2000): Promise<void> {
const start = Date.now();
while (!predicate()) {
if (Date.now() - start > timeoutMs) {
throw new Error('Timed out waiting for condition');
}
await new Promise((r) => setTimeout(r, 10));
}
}

/**
* Resolve the Nth pending AgentLoop.run() call. Fires the minimal events
* that `runAgentLoop` expects (usage + message_complete) so the session
Expand Down Expand Up @@ -1090,6 +1109,109 @@ describe('Surface-action queue-full trace', () => {
});
});

// ---------------------------------------------------------------------------
// Host attachment approval tests
// ---------------------------------------------------------------------------

describe('Session host attachment directives', () => {
beforeEach(() => {
pendingRuns = [];
});

test('host attachment prompts and resolves when user allows', async () => {
const hostPath = '/tmp/vellum-host-attachment-allow.txt';
writeFileSync(hostPath, 'host attachment content');

try {
const clientEvents: ServerMessage[] = [];
const events: ServerMessage[] = [];
const session = makeSession((msg) => clientEvents.push(msg));
await session.loadFromDb();

const p1 = session.processMessage('msg-1', [], (e) => events.push(e), 'req-1');
await waitForPendingRun(1);

const run = pendingRuns[0];
const assistantMsg: Message = {
role: 'assistant',
content: [
{
type: 'text',
text: `Here is your file.\n<vellum-attachment source="host" path="${hostPath}" />`,
},
],
};
run.onEvent({ type: 'usage', inputTokens: 10, outputTokens: 5, model: 'mock', providerDurationMs: 100 });
run.onEvent({ type: 'message_complete', message: assistantMsg });
run.resolve([...run.messages, assistantMsg]);

await waitForCondition(() => clientEvents.some((e) => e.type === 'confirmation_request'));
const confirmation = clientEvents.find((e) => e.type === 'confirmation_request');
expect(confirmation).toBeDefined();
session.handleConfirmationResponse((confirmation as { requestId: string }).requestId, 'allow');

await p1;

expect(session.lastAssistantAttachments).toHaveLength(1);
expect(session.lastAssistantAttachments[0].sourceType).toBe('host_file');
expect(session.lastAttachmentWarnings).toHaveLength(0);

const completion = events.find((e) => e.type === 'message_complete');
expect(completion).toBeDefined();
} finally {
rmSync(hostPath, { force: true });
}
});

test('host attachment denial is non-fatal and emits warning text', async () => {
const hostPath = '/tmp/vellum-host-attachment-deny.txt';
writeFileSync(hostPath, 'host attachment content');

try {
const clientEvents: ServerMessage[] = [];
const events: ServerMessage[] = [];
const session = makeSession((msg) => clientEvents.push(msg));
await session.loadFromDb();

const p1 = session.processMessage('msg-1', [], (e) => events.push(e), 'req-1');
await waitForPendingRun(1);

const run = pendingRuns[0];
const assistantMsg: Message = {
role: 'assistant',
content: [
{
type: 'text',
text: `Here is your file.\n<vellum-attachment source="host" path="${hostPath}" />`,
},
],
};
run.onEvent({ type: 'usage', inputTokens: 10, outputTokens: 5, model: 'mock', providerDurationMs: 100 });
run.onEvent({ type: 'message_complete', message: assistantMsg });
run.resolve([...run.messages, assistantMsg]);

await waitForCondition(() => clientEvents.some((e) => e.type === 'confirmation_request'));
const confirmation = clientEvents.find((e) => e.type === 'confirmation_request');
expect(confirmation).toBeDefined();
session.handleConfirmationResponse((confirmation as { requestId: string }).requestId, 'deny');

await p1;

expect(session.lastAssistantAttachments).toHaveLength(0);
expect(session.lastAttachmentWarnings.some((w) => w.includes('access denied by user'))).toBe(true);

const warningDelta = events.find(
(e) => e.type === 'assistant_text_delta' && e.text.includes('Attachment warning:'),
);
expect(warningDelta).toBeDefined();
const completion = events.find((e) => e.type === 'message_complete');
expect(completion).toBeDefined();
} finally {
rmSync(hostPath, { force: true });
}
});
});

// ---------------------------------------------------------------------------
// Regression: cancel semantics + session/global error channel split
// ---------------------------------------------------------------------------
Expand Down
2 changes: 2 additions & 0 deletions assistant/src/daemon/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ export class DaemonServer {
attachmentIds?: string[],
): Promise<{ messageId: string }> {
const session = await this.getOrCreateSession(conversationId);
session.setAssistantId(assistantId);
Comment thread
siddseethepalli marked this conversation as resolved.

// Reject concurrent requests upfront. The HTTP path should never use
// the message queue — it returns 409 to the caller instead.
Expand Down Expand Up @@ -633,6 +634,7 @@ export class DaemonServer {
attachmentIds?: string[],
): Promise<{ messageId: string }> {
const session = await this.getOrCreateSession(conversationId);
session.setAssistantId(assistantId);

if (session.isProcessing()) {
throw new Error('Session is already processing a message');
Expand Down
84 changes: 66 additions & 18 deletions assistant/src/daemon/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,17 @@ import type { Provider } from '../providers/types.js';
import { createUserMessage, createAssistantMessage } from '../agent/message-types.js';
import * as conversationStore from '../memory/conversation-store.js';
import { uploadAttachment, linkAttachmentToMessage } from '../memory/attachments-store.js';
import { getApp } from '../memory/app-store.js';
import { PermissionPrompter } from '../permissions/prompter.js';
import { SecretPrompter } from '../permissions/secret-prompter.js';
import { addRule, findHighestPriorityRule } from '../permissions/trust-store.js';
import { check, classifyRisk, generateAllowlistOptions, generateScopeOptions } from '../permissions/checker.js';
import { ToolExecutor } from '../tools/executor.js';
import type { ToolLifecycleEventHandler, ToolExecutionResult } from '../tools/types.js';
import { getAllToolDefinitions } from '../tools/registry.js';
import { allUiSurfaceTools } from '../tools/ui-surface/definitions.js';
import { allAppTools } from '../tools/apps/definitions.js';
import { requestComputerControlTool } from '../tools/computer-use/request-computer-control.js';
import type { UserDecision } from '../permissions/types.js';
import { generateScopeOptions } from '../permissions/checker.js';
import { getConfig } from '../config/loader.js';
import { estimateCost, resolvePricing } from '../util/pricing.js';
import { getLogger } from '../util/logger.js';
Expand Down Expand Up @@ -114,6 +113,7 @@ export class Session {
private contextWindowManager: ContextWindowManager;
private contextCompactedMessageCount = 0;
private currentRequestId?: string;
private assistantId: string | null = null;
private messageQueue: QueuedMessage[] = [];
private pendingSurfaceActions = new Map<string, {
surfaceType: SurfaceType;
Expand Down Expand Up @@ -414,6 +414,54 @@ export class Session {
this.secretPrompter.resolveSecret(requestId, value);
}

/**
* Bind a runtime assistant ID to this session.
* IPC-only desktop sessions can leave this unset and use a local scope.
*/
setAssistantId(assistantId: string): void {
this.assistantId = assistantId;
}

private async approveHostAttachmentRead(filePath: string): Promise<boolean> {
const toolName = 'host_file_read';
const input = { path: filePath };
const decision = await check(toolName, input, this.workingDir);

if (decision.decision === 'allow') {
return true;
}
if (decision.decision === 'deny') {
return false;
}

const response = await this.prompter.prompt(
Comment thread
siddseethepalli marked this conversation as resolved.
toolName,
input,
await classifyRisk(toolName, input),
generateAllowlistOptions(toolName, input),
generateScopeOptions(this.workingDir, toolName),
undefined,
undefined,
this.conversationId,
'host',
);

if (response.decision === 'always_allow' && response.selectedPattern && response.selectedScope) {
addRule(toolName, response.selectedPattern, response.selectedScope);
}
if (response.decision === 'always_deny' && response.selectedPattern && response.selectedScope) {
addRule(toolName, response.selectedPattern, response.selectedScope, 'deny');
}

return response.decision === 'allow' || response.decision === 'always_allow';
}

private formatAttachmentWarnings(warnings: string[]): string | null {
if (warnings.length === 0) return null;
const lines = warnings.map((warning) => `Attachment warning: ${warning}`);
return `\n\n${lines.join('\n')}`;
}

/**
* Persist a user message and mark the session as processing.
* Returns the messageId immediately without running the agent loop.
Expand Down Expand Up @@ -867,10 +915,7 @@ export class Session {
// Resolve accumulated attachment directives and tool content blocks
let assistantAttachments: AssistantAttachmentDraft[] = [];
if (accumulatedDirectives.length > 0 || accumulatedToolContentBlocks.length > 0) {
const approveHostRead: ApproveHostRead = async (_filePath) => {
// TODO(PR-6+): Wire to permission prompter for interactive approval
return false;
};
const approveHostRead: ApproveHostRead = async (filePath) => this.approveHostAttachmentRead(filePath);

const directiveDrafts = accumulatedDirectives.length > 0
? await resolveDirectives(accumulatedDirectives, this.workingDir, approveHostRead)
Expand All @@ -887,24 +932,27 @@ export class Session {

// Persist resolved attachments and link to the last assistant message
if (assistantAttachments.length > 0 && lastAssistantMessageId) {
const assistantId = getApp()?.id;
if (assistantId) {
for (let i = 0; i < assistantAttachments.length; i++) {
const draft = assistantAttachments[i];
const stored = uploadAttachment(
assistantId,
draft.filename,
draft.mimeType,
draft.dataBase64,
);
linkAttachmentToMessage(lastAssistantMessageId, stored.id, i);
}
const attachmentScope = this.assistantId ?? 'local-assistant';
Comment thread
siddseethepalli marked this conversation as resolved.
for (let i = 0; i < assistantAttachments.length; i++) {
const draft = assistantAttachments[i];
const stored = uploadAttachment(
attachmentScope,
draft.filename,
draft.mimeType,
draft.dataBase64,
);
linkAttachmentToMessage(lastAssistantMessageId, stored.id, i);
}
}

this.lastAssistantAttachments = assistantAttachments;
this.lastAttachmentWarnings = directiveWarnings;

const warningText = this.formatAttachmentWarnings(directiveWarnings);
if (warningText) {
onEvent({ type: 'assistant_text_delta', text: warningText, sessionId: this.conversationId });
}

if (yieldedForHandoff) {
this.traceEmitter.emit('generation_handoff', 'Handing off to next queued message', {
requestId: reqId,
Expand Down
Loading