diff --git a/assistant/src/__tests__/send-endpoint-busy.test.ts b/assistant/src/__tests__/send-endpoint-busy.test.ts new file mode 100644 index 00000000000..1ff37c409ef --- /dev/null +++ b/assistant/src/__tests__/send-endpoint-busy.test.ts @@ -0,0 +1,284 @@ +/** + * Tests for POST /v1/messages queue-if-busy behavior and hub publishing. + * + * Validates that: + * - Messages are accepted (202) when the session is idle, with hub events published. + * - Messages are queued (202, queued: true) when the session is busy, not 409. + * - SSE subscribers receive events from messages sent via this endpoint. + */ +import { describe, test, expect, beforeEach, afterAll, mock } from 'bun:test'; +import { mkdtempSync, rmSync, realpathSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import type { ServerMessage } from '../daemon/ipc-protocol.js'; +import type { Session } from '../daemon/session.js'; + +const testDir = realpathSync(mkdtempSync(join(tmpdir(), 'send-endpoint-busy-test-'))); + +mock.module('../util/platform.js', () => ({ + getRootDir: () => testDir, + getDataDir: () => testDir, + isMacOS: () => process.platform === 'darwin', + isLinux: () => process.platform === 'linux', + isWindows: () => process.platform === 'win32', + getSocketPath: () => join(testDir, 'test.sock'), + getPidPath: () => join(testDir, 'test.pid'), + getDbPath: () => join(testDir, 'test.db'), + getLogPath: () => join(testDir, 'test.log'), + ensureDataDir: () => {}, +})); + +mock.module('../util/logger.js', () => ({ + getLogger: () => new Proxy({} as Record, { + get: () => () => {}, + }), +})); + +mock.module('../config/loader.js', () => ({ + getConfig: () => ({ + model: 'test', + provider: 'test', + apiKeys: {}, + memory: { enabled: false }, + rateLimit: { maxRequestsPerMinute: 0, maxTokensPerSession: 0 }, + secretDetection: { enabled: false }, + }), +})); + +import { initializeDb, getDb, resetDb } from '../memory/db.js'; +import { RuntimeHttpServer } from '../runtime/http-server.js'; +import { AssistantEventHub } from '../runtime/assistant-event-hub.js'; +import type { AssistantEvent } from '../runtime/assistant-event.js'; + +initializeDb(); + +// --------------------------------------------------------------------------- +// Session helpers +// --------------------------------------------------------------------------- + +/** Session that completes its agent loop quickly and emits a text delta + message_complete. */ +function makeCompletingSession(): Session { + let processing = false; + return { + isProcessing: () => processing, + persistUserMessage: (_content: string, _attachments: unknown[], requestId?: string) => { + processing = true; + return requestId ?? 'msg-1'; + }, + memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false }, + setChannelCapabilities: () => {}, + setAssistantId: () => {}, + setGuardianContext: () => {}, + setCommandIntent: () => {}, + updateClient: () => {}, + enqueueMessage: () => ({ queued: false, requestId: 'noop' }), + runAgentLoop: async (_content: string, _messageId: string, onEvent: (msg: ServerMessage) => void) => { + onEvent({ type: 'assistant_text_delta', text: 'Hello!' }); + onEvent({ type: 'message_complete', sessionId: 'test-session' }); + processing = false; + }, + handleConfirmationResponse: () => {}, + handleSecretResponse: () => {}, + } as unknown as Session; +} + +/** Session that hangs forever in the agent loop (simulates a busy session). */ +function makeHangingSession(): Session { + let processing = false; + const enqueuedMessages: Array<{ content: string; onEvent: (msg: ServerMessage) => void; requestId: string }> = []; + return { + isProcessing: () => processing, + persistUserMessage: (_content: string, _attachments: unknown[], requestId?: string) => { + processing = true; + return requestId ?? 'msg-1'; + }, + memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false }, + setChannelCapabilities: () => {}, + setAssistantId: () => {}, + setGuardianContext: () => {}, + setCommandIntent: () => {}, + updateClient: () => {}, + enqueueMessage: (content: string, _attachments: unknown[], onEvent: (msg: ServerMessage) => void, requestId: string) => { + enqueuedMessages.push({ content, onEvent, requestId }); + return { queued: true, requestId }; + }, + runAgentLoop: async () => { + // Hang forever + await new Promise(() => {}); + }, + handleConfirmationResponse: () => {}, + handleSecretResponse: () => {}, + _enqueuedMessages: enqueuedMessages, + } as unknown as Session; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +const TEST_TOKEN = 'test-bearer-token-send'; +const AUTH_HEADERS = { Authorization: `Bearer ${TEST_TOKEN}` }; + +describe('POST /v1/messages — queue-if-busy and hub publishing', () => { + let server: RuntimeHttpServer; + let port: number; + let eventHub: AssistantEventHub; + + beforeEach(() => { + const db = getDb(); + db.run('DELETE FROM messages'); + db.run('DELETE FROM conversations'); + db.run('DELETE FROM conversation_keys'); + eventHub = new AssistantEventHub(); + }); + + afterAll(() => { + resetDb(); + try { rmSync(testDir, { recursive: true, force: true }); } catch { /* best effort */ } + }); + + async function startServer(sessionFactory: () => Session): Promise { + port = 19000 + Math.floor(Math.random() * 1000); + server = new RuntimeHttpServer({ + port, + bearerToken: TEST_TOKEN, + sendMessageDeps: { + getOrCreateSession: async () => sessionFactory(), + assistantEventHub: eventHub, + resolveAttachments: () => [], + }, + }); + await server.start(); + } + + async function stopServer(): Promise { + await server?.stop(); + } + + function messagesUrl(): string { + return `http://127.0.0.1:${port}/v1/messages`; + } + + // ── Idle session: immediate processing ────────────────────────────── + + test('returns 202 with accepted: true and messageId when session is idle', async () => { + await startServer(() => makeCompletingSession()); + + const res = await fetch(messagesUrl(), { + method: 'POST', + headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS }, + body: JSON.stringify({ conversationKey: 'conv-idle', content: 'Hello', sourceChannel: 'macos' }), + }); + const body = await res.json() as { accepted: boolean; messageId: string }; + + expect(res.status).toBe(202); + expect(body.accepted).toBe(true); + expect(body.messageId).toBeDefined(); + + await stopServer(); + }); + + test('publishes events to assistantEventHub when session is idle', async () => { + const publishedEvents: AssistantEvent[] = []; + + await startServer(() => makeCompletingSession()); + + eventHub.subscribe( + { assistantId: 'self' }, + (event) => { publishedEvents.push(event); }, + ); + + const res = await fetch(messagesUrl(), { + method: 'POST', + headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS }, + body: JSON.stringify({ conversationKey: 'conv-hub', content: 'Hello hub', sourceChannel: 'macos' }), + }); + expect(res.status).toBe(202); + + // Wait for the async agent loop to complete and events to be published + await new Promise((r) => setTimeout(r, 100)); + + // Should have received assistant_text_delta and message_complete + const types = publishedEvents.map((e) => e.message.type); + expect(types).toContain('assistant_text_delta'); + expect(types).toContain('message_complete'); + + await stopServer(); + }); + + // ── Busy session: queue-if-busy ───────────────────────────────────── + + test('returns 202 with queued: true when session is busy (not 409)', async () => { + const session = makeHangingSession(); + await startServer(() => session); + + // First message starts the agent loop and makes the session busy + const res1 = await fetch(messagesUrl(), { + method: 'POST', + headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS }, + body: JSON.stringify({ conversationKey: 'conv-busy', content: 'First', sourceChannel: 'macos' }), + }); + expect(res1.status).toBe(202); + const body1 = await res1.json() as { accepted: boolean; messageId: string }; + expect(body1.accepted).toBe(true); + expect(body1.messageId).toBeDefined(); + + // Wait for the agent loop to start + await new Promise((r) => setTimeout(r, 30)); + + // Second message should be queued, not rejected + const res2 = await fetch(messagesUrl(), { + method: 'POST', + headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS }, + body: JSON.stringify({ conversationKey: 'conv-busy', content: 'Second', sourceChannel: 'macos' }), + }); + const body2 = await res2.json() as { accepted: boolean; queued: boolean }; + + expect(res2.status).toBe(202); + expect(body2.accepted).toBe(true); + expect(body2.queued).toBe(true); + + await stopServer(); + }); + + // ── Validation ────────────────────────────────────────────────────── + + test('returns 400 when sourceChannel is missing', async () => { + await startServer(() => makeCompletingSession()); + + const res = await fetch(messagesUrl(), { + method: 'POST', + headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS }, + body: JSON.stringify({ conversationKey: 'conv-val', content: 'Hello' }), + }); + expect(res.status).toBe(400); + + await stopServer(); + }); + + test('returns 400 when content is empty', async () => { + await startServer(() => makeCompletingSession()); + + const res = await fetch(messagesUrl(), { + method: 'POST', + headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS }, + body: JSON.stringify({ conversationKey: 'conv-empty', content: '', sourceChannel: 'macos' }), + }); + expect(res.status).toBe(400); + + await stopServer(); + }); + + test('returns 400 when conversationKey is missing', async () => { + await startServer(() => makeCompletingSession()); + + const res = await fetch(messagesUrl(), { + method: 'POST', + headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS }, + body: JSON.stringify({ content: 'Hello', sourceChannel: 'macos' }), + }); + expect(res.status).toBe(400); + + await stopServer(); + }); +}); diff --git a/assistant/src/daemon/lifecycle.ts b/assistant/src/daemon/lifecycle.ts index f3636ae0d0a..6c03a86c0eb 100644 --- a/assistant/src/daemon/lifecycle.ts +++ b/assistant/src/daemon/lifecycle.ts @@ -36,6 +36,8 @@ import { QdrantManager } from '../memory/qdrant-manager.js'; import { initQdrantClient } from '../memory/qdrant-client.js'; import { startScheduler } from '../schedule/scheduler.js'; import { RuntimeHttpServer } from '../runtime/http-server.js'; +import { assistantEventHub } from '../runtime/assistant-event-hub.js'; +import * as attachmentsStore from '../memory/attachments-store.js'; import { getHookManager } from '../hooks/manager.js'; import { installTemplates } from '../hooks/templates.js'; import { installCliLaunchers } from './install-cli-launchers.js'; @@ -263,13 +265,24 @@ export async function runDaemon(): Promise { interfacesDir: getInterfacesDir(), approvalCopyGenerator: createApprovalCopyGenerator(), approvalConversationGenerator: createApprovalConversationGenerator(), + sendMessageDeps: { + getOrCreateSession: (conversationId) => + server.getSessionForMessages(conversationId), + assistantEventHub, + resolveAttachments: (attachmentIds) => + attachmentsStore.getAttachmentsByIds(attachmentIds).map((a) => ({ + id: a.id, + filename: a.originalFilename, + mimeType: a.mimeType, + data: a.dataBase64, + })), + }, }); // Inject the voice bridge orchestrator BEFORE attempting to start the HTTP // server. The bridge only needs the RunOrchestrator instance (already created // above) and must be available even when the HTTP server fails to bind. setVoiceBridgeOrchestrator(runOrchestrator); - try { await runtimeHttp.start(); setRelayBroadcast((msg) => server.broadcast(msg)); diff --git a/assistant/src/daemon/server.ts b/assistant/src/daemon/server.ts index 7f1b873e028..f9f02ed81a5 100644 --- a/assistant/src/daemon/server.ts +++ b/assistant/src/daemon/server.ts @@ -819,6 +819,14 @@ export class DaemonServer { return { messageId }; } + /** + * Expose session lookup for the POST /v1/messages handler. + * The handler manages busy-state checking and queueing itself. + */ + async getSessionForMessages(conversationId: string): Promise { + return this.getOrCreateSession(conversationId, undefined, true); + } + createRunOrchestrator(): RunOrchestrator { return new RunOrchestrator({ getOrCreateSession: (conversationId, transport) => diff --git a/assistant/src/runtime/http-server.ts b/assistant/src/runtime/http-server.ts index be689bf8893..ff8a8a4562d 100644 --- a/assistant/src/runtime/http-server.ts +++ b/assistant/src/runtime/http-server.ts @@ -121,6 +121,7 @@ export type { RuntimeAttachmentMetadata, ApprovalCopyGenerator, ApprovalConversationGenerator, + SendMessageDeps, } from './http-types.js'; import type { @@ -129,6 +130,7 @@ import type { RuntimeHttpServerOptions, ApprovalCopyGenerator, ApprovalConversationGenerator, + SendMessageDeps, } from './http-types.js'; const log = getLogger('runtime-http'); @@ -156,6 +158,7 @@ export class RuntimeHttpServer { private sweepInProgress = false; private pairingStore = new PairingStore(); private pairingBroadcast?: (msg: ServerMessage) => void; + private sendMessageDeps?: SendMessageDeps; constructor(options: RuntimeHttpServerOptions = {}) { this.port = options.port ?? DEFAULT_PORT; @@ -167,6 +170,7 @@ export class RuntimeHttpServer { this.approvalCopyGenerator = options.approvalCopyGenerator; this.approvalConversationGenerator = options.approvalConversationGenerator; this.interfacesDir = options.interfacesDir ?? null; + this.sendMessageDeps = options.sendMessageDeps; } /** The port the server is actually listening on (resolved after start). */ @@ -558,6 +562,7 @@ export class RuntimeHttpServer { return await handleSendMessage(req, { processMessage: this.processMessage, persistAndProcessMessage: this.persistAndProcessMessage, + sendMessageDeps: this.sendMessageDeps, }); } diff --git a/assistant/src/runtime/http-types.ts b/assistant/src/runtime/http-types.ts index 4a6029bb15c..7d87077a1ed 100644 --- a/assistant/src/runtime/http-types.ts +++ b/assistant/src/runtime/http-types.ts @@ -5,6 +5,8 @@ import type { ChannelId } from '../channels/types.js'; import type { RunOrchestrator } from './run-orchestrator.js'; import type { GuardianRuntimeContext } from '../daemon/session-runtime-assembly.js'; import type { ApprovalMessageContext, ComposeApprovalMessageGenerativeOptions } from './approval-message-composer.js'; +import type { Session } from '../daemon/session.js'; +import type { AssistantEventHub } from './assistant-event-hub.js'; /** * Daemon-injected function that generates approval copy using a provider. @@ -84,6 +86,24 @@ export type NonBlockingMessageProcessor = ( sourceChannel?: ChannelId, ) => Promise<{ messageId: string }>; +/** + * Dependencies for the POST /v1/messages handler. + * + * The handler needs direct access to the session so it can check busy state, + * persist user messages, fire the agent loop, or queue messages when busy. + * Hub publishing wires outbound events to the SSE stream. + */ +export interface SendMessageDeps { + getOrCreateSession: (conversationId: string) => Promise; + assistantEventHub: AssistantEventHub; + resolveAttachments: (attachmentIds: string[]) => Array<{ + id: string; + filename: string; + mimeType: string; + data: string; + }>; +} + export interface RuntimeHttpServerOptions { port?: number; /** Hostname / IP to bind to. Defaults to '127.0.0.1' (loopback-only). */ @@ -101,6 +121,8 @@ export interface RuntimeHttpServerOptions { approvalCopyGenerator?: ApprovalCopyGenerator; /** Daemon-injected generator for conversational approval flow (provider-backed). */ approvalConversationGenerator?: ApprovalConversationGenerator; + /** Dependencies for the POST /v1/messages queue-if-busy handler. */ + sendMessageDeps?: SendMessageDeps; } export interface RuntimeAttachmentMetadata { diff --git a/assistant/src/runtime/routes/conversation-routes.ts b/assistant/src/runtime/routes/conversation-routes.ts index 9c3d8ff1bf5..c437313608a 100644 --- a/assistant/src/runtime/routes/conversation-routes.ts +++ b/assistant/src/runtime/routes/conversation-routes.ts @@ -18,7 +18,13 @@ import type { NonBlockingMessageProcessor, RuntimeAttachmentMetadata, RuntimeMessagePayload, + SendMessageDeps, } from '../http-types.js'; +import type { ServerMessage } from '../../daemon/ipc-protocol.js'; +import { buildAssistantEvent } from '../assistant-event.js'; +import { getLogger } from '../../util/logger.js'; + +const log = getLogger('conversation-routes'); const SUGGESTION_CACHE_MAX = 100; @@ -134,11 +140,40 @@ export function handleListMessages( return Response.json({ messages }); } +/** + * Build an `onEvent` callback that publishes every outbound event to the + * assistant event hub, maintaining ordered delivery through a serial chain. + */ +function makeHubPublisher( + deps: SendMessageDeps, + conversationId: string, +): (msg: ServerMessage) => void { + let hubChain: Promise = Promise.resolve(); + return (msg: ServerMessage) => { + const msgRecord = msg as unknown as Record; + const msgSessionId = + 'sessionId' in msg && typeof msgRecord.sessionId === 'string' + ? (msgRecord.sessionId as string) + : undefined; + const resolvedSessionId = msgSessionId ?? conversationId; + const event = buildAssistantEvent('self', msg, resolvedSessionId); + hubChain = (async () => { + await hubChain; + try { + await deps.assistantEventHub.publish(event); + } catch (err) { + log.warn({ err }, 'assistant-events hub subscriber threw during POST /messages'); + } + })(); + }; +} + export async function handleSendMessage( req: Request, deps: { processMessage?: MessageProcessor; persistAndProcessMessage?: NonBlockingMessageProcessor; + sendMessageDeps?: SendMessageDeps; }, ): Promise { const body = await req.json() as { @@ -204,6 +239,47 @@ export async function handleSendMessage( const mapping = getOrCreateConversation(conversationKey); + // ── Queue-if-busy path (preferred when sendMessageDeps is wired) ──── + if (deps.sendMessageDeps) { + const smDeps = deps.sendMessageDeps; + const session = await smDeps.getOrCreateSession(mapping.conversationId); + const onEvent = makeHubPublisher(smDeps, mapping.conversationId); + + const attachments = hasAttachments + ? smDeps.resolveAttachments(attachmentIds) + : []; + + if (session.isProcessing()) { + // Queue the message so it's processed when the current turn completes + const requestId = crypto.randomUUID(); + const result = session.enqueueMessage( + content ?? '', + attachments, + onEvent, + requestId, + ); + if (result.rejected) { + return Response.json( + { error: 'Message queue is full. Please retry later.' }, + { status: 429 }, + ); + } + return Response.json({ accepted: true, queued: true }, { status: 202 }); + } + + // Session is idle — persist and fire agent loop immediately + const requestId = crypto.randomUUID(); + const messageId = session.persistUserMessage(content ?? '', attachments, requestId); + + // Fire-and-forget the agent loop; events flow to the hub via onEvent + session.runAgentLoop(content ?? '', messageId, onEvent).catch((err) => { + log.error({ err, conversationId: mapping.conversationId }, 'Agent loop failed (POST /messages)'); + }); + + return Response.json({ accepted: true, messageId }, { status: 202 }); + } + + // ── Legacy path (fallback when sendMessageDeps not wired) ─────────── const processor = deps.persistAndProcessMessage ?? deps.processMessage; if (!processor) { return Response.json({ error: 'Message processing not configured' }, { status: 503 }); @@ -217,7 +293,7 @@ export async function handleSendMessage( undefined, sourceChannel, ); - return Response.json({ accepted: true, messageId: result.messageId }); + return Response.json({ accepted: true, messageId: result.messageId }, { status: 202 }); } catch (err) { if (err instanceof Error && err.message === 'Session is already processing a message') { return Response.json( diff --git a/gateway/src/index.ts b/gateway/src/index.ts index e2763f64727..5e9de8381d2 100644 --- a/gateway/src/index.ts +++ b/gateway/src/index.ts @@ -43,13 +43,22 @@ function startHttpTokenWatcher(cfg: GatewayConfig): FSWatcher | null { ?? join(process.env.BASE_DATA_DIR?.trim() || homedir(), ".vellum", "http-token"); const dir = dirname(tokenPath); - if (!existsSync(dir)) { - mkdirSync(dir, { recursive: true }); + try { + if (!existsSync(dir)) { + mkdirSync(dir, { recursive: true }); + } + } catch (err) { + log.warn({ err, path: dir }, "Cannot create token directory, skipping http-token watcher"); + return null; } let debounceTimer: ReturnType | null = null; function refresh(): void { + // Skip file-based refresh when env vars explicitly pin the tokens — + // respect the same precedence as loadConfig(). + if (process.env.RUNTIME_BEARER_TOKEN) return; + try { const token = readFileSync(tokenPath, "utf-8").trim() || undefined; if (token && token !== cfg.runtimeBearerToken) {