Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 284 additions & 0 deletions assistant/src/__tests__/send-endpoint-busy.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
/**
* Tests for POST /v1/messages queue-if-busy behavior and hub publishing.
*
* Validates that:
* - Messages are accepted (202) when the session is idle, with hub events published.
* - Messages are queued (202, queued: true) when the session is busy, not 409.
* - SSE subscribers receive events from messages sent via this endpoint.
*/
import { describe, test, expect, beforeEach, afterAll, mock } from 'bun:test';
import { mkdtempSync, rmSync, realpathSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import type { ServerMessage } from '../daemon/ipc-protocol.js';
import type { Session } from '../daemon/session.js';

const testDir = realpathSync(mkdtempSync(join(tmpdir(), 'send-endpoint-busy-test-')));

mock.module('../util/platform.js', () => ({
getRootDir: () => testDir,
getDataDir: () => testDir,
isMacOS: () => process.platform === 'darwin',
isLinux: () => process.platform === 'linux',
isWindows: () => process.platform === 'win32',
getSocketPath: () => join(testDir, 'test.sock'),
getPidPath: () => join(testDir, 'test.pid'),
getDbPath: () => join(testDir, 'test.db'),
getLogPath: () => join(testDir, 'test.log'),
ensureDataDir: () => {},
}));

mock.module('../util/logger.js', () => ({
getLogger: () => new Proxy({} as Record<string, unknown>, {
get: () => () => {},
}),
}));

mock.module('../config/loader.js', () => ({
getConfig: () => ({
model: 'test',
provider: 'test',
apiKeys: {},
memory: { enabled: false },
rateLimit: { maxRequestsPerMinute: 0, maxTokensPerSession: 0 },
secretDetection: { enabled: false },
}),
}));

import { initializeDb, getDb, resetDb } from '../memory/db.js';
import { RuntimeHttpServer } from '../runtime/http-server.js';
import { AssistantEventHub } from '../runtime/assistant-event-hub.js';
import type { AssistantEvent } from '../runtime/assistant-event.js';

initializeDb();

// ---------------------------------------------------------------------------
// Session helpers
// ---------------------------------------------------------------------------

/** Session that completes its agent loop quickly and emits a text delta + message_complete. */
function makeCompletingSession(): Session {
let processing = false;
return {
isProcessing: () => processing,
persistUserMessage: (_content: string, _attachments: unknown[], requestId?: string) => {
processing = true;
return requestId ?? 'msg-1';
},
memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false },
setChannelCapabilities: () => {},
setAssistantId: () => {},
setGuardianContext: () => {},
setCommandIntent: () => {},
updateClient: () => {},
enqueueMessage: () => ({ queued: false, requestId: 'noop' }),
runAgentLoop: async (_content: string, _messageId: string, onEvent: (msg: ServerMessage) => void) => {
onEvent({ type: 'assistant_text_delta', text: 'Hello!' });
onEvent({ type: 'message_complete', sessionId: 'test-session' });
processing = false;
},
handleConfirmationResponse: () => {},
handleSecretResponse: () => {},
} as unknown as Session;
}

/** Session that hangs forever in the agent loop (simulates a busy session). */
function makeHangingSession(): Session {
let processing = false;
const enqueuedMessages: Array<{ content: string; onEvent: (msg: ServerMessage) => void; requestId: string }> = [];
return {
isProcessing: () => processing,
persistUserMessage: (_content: string, _attachments: unknown[], requestId?: string) => {
processing = true;
return requestId ?? 'msg-1';
},
memoryPolicy: { scopeId: 'default', includeDefaultFallback: false, strictSideEffects: false },
setChannelCapabilities: () => {},
setAssistantId: () => {},
setGuardianContext: () => {},
setCommandIntent: () => {},
updateClient: () => {},
enqueueMessage: (content: string, _attachments: unknown[], onEvent: (msg: ServerMessage) => void, requestId: string) => {
enqueuedMessages.push({ content, onEvent, requestId });
return { queued: true, requestId };
},
runAgentLoop: async () => {
// Hang forever
await new Promise<void>(() => {});
},
handleConfirmationResponse: () => {},
handleSecretResponse: () => {},
_enqueuedMessages: enqueuedMessages,
} as unknown as Session;
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

const TEST_TOKEN = 'test-bearer-token-send';
const AUTH_HEADERS = { Authorization: `Bearer ${TEST_TOKEN}` };

describe('POST /v1/messages — queue-if-busy and hub publishing', () => {
let server: RuntimeHttpServer;
let port: number;
let eventHub: AssistantEventHub;

beforeEach(() => {
const db = getDb();
db.run('DELETE FROM messages');
db.run('DELETE FROM conversations');
db.run('DELETE FROM conversation_keys');
eventHub = new AssistantEventHub();
});

afterAll(() => {
resetDb();
try { rmSync(testDir, { recursive: true, force: true }); } catch { /* best effort */ }
});

async function startServer(sessionFactory: () => Session): Promise<void> {
port = 19000 + Math.floor(Math.random() * 1000);
server = new RuntimeHttpServer({
port,
bearerToken: TEST_TOKEN,
sendMessageDeps: {
getOrCreateSession: async () => sessionFactory(),
assistantEventHub: eventHub,
resolveAttachments: () => [],
},
});
await server.start();
}

async function stopServer(): Promise<void> {
await server?.stop();
}

function messagesUrl(): string {
return `http://127.0.0.1:${port}/v1/messages`;
}

// ── Idle session: immediate processing ──────────────────────────────

test('returns 202 with accepted: true and messageId when session is idle', async () => {
await startServer(() => makeCompletingSession());

const res = await fetch(messagesUrl(), {
method: 'POST',
headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS },
body: JSON.stringify({ conversationKey: 'conv-idle', content: 'Hello', sourceChannel: 'macos' }),
});
const body = await res.json() as { accepted: boolean; messageId: string };

expect(res.status).toBe(202);
expect(body.accepted).toBe(true);
expect(body.messageId).toBeDefined();

await stopServer();
});

test('publishes events to assistantEventHub when session is idle', async () => {
const publishedEvents: AssistantEvent[] = [];

await startServer(() => makeCompletingSession());

eventHub.subscribe(
{ assistantId: 'self' },
(event) => { publishedEvents.push(event); },
);

const res = await fetch(messagesUrl(), {
method: 'POST',
headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS },
body: JSON.stringify({ conversationKey: 'conv-hub', content: 'Hello hub', sourceChannel: 'macos' }),
});
expect(res.status).toBe(202);

// Wait for the async agent loop to complete and events to be published
await new Promise((r) => setTimeout(r, 100));

// Should have received assistant_text_delta and message_complete
const types = publishedEvents.map((e) => e.message.type);
expect(types).toContain('assistant_text_delta');
expect(types).toContain('message_complete');

await stopServer();
});

// ── Busy session: queue-if-busy ─────────────────────────────────────

test('returns 202 with queued: true when session is busy (not 409)', async () => {
const session = makeHangingSession();
await startServer(() => session);

// First message starts the agent loop and makes the session busy
const res1 = await fetch(messagesUrl(), {
method: 'POST',
headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS },
body: JSON.stringify({ conversationKey: 'conv-busy', content: 'First', sourceChannel: 'macos' }),
});
expect(res1.status).toBe(202);
const body1 = await res1.json() as { accepted: boolean; messageId: string };
expect(body1.accepted).toBe(true);
expect(body1.messageId).toBeDefined();

// Wait for the agent loop to start
await new Promise((r) => setTimeout(r, 30));

// Second message should be queued, not rejected
const res2 = await fetch(messagesUrl(), {
method: 'POST',
headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS },
body: JSON.stringify({ conversationKey: 'conv-busy', content: 'Second', sourceChannel: 'macos' }),
});
const body2 = await res2.json() as { accepted: boolean; queued: boolean };

expect(res2.status).toBe(202);
expect(body2.accepted).toBe(true);
expect(body2.queued).toBe(true);

await stopServer();
});

// ── Validation ──────────────────────────────────────────────────────

test('returns 400 when sourceChannel is missing', async () => {
await startServer(() => makeCompletingSession());

const res = await fetch(messagesUrl(), {
method: 'POST',
headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS },
body: JSON.stringify({ conversationKey: 'conv-val', content: 'Hello' }),
});
expect(res.status).toBe(400);

await stopServer();
});

test('returns 400 when content is empty', async () => {
await startServer(() => makeCompletingSession());

const res = await fetch(messagesUrl(), {
method: 'POST',
headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS },
body: JSON.stringify({ conversationKey: 'conv-empty', content: '', sourceChannel: 'macos' }),
});
expect(res.status).toBe(400);

await stopServer();
});

test('returns 400 when conversationKey is missing', async () => {
await startServer(() => makeCompletingSession());

const res = await fetch(messagesUrl(), {
method: 'POST',
headers: { 'Content-Type': 'application/json', ...AUTH_HEADERS },
body: JSON.stringify({ content: 'Hello', sourceChannel: 'macos' }),
});
expect(res.status).toBe(400);

await stopServer();
});
});
14 changes: 14 additions & 0 deletions assistant/src/daemon/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import { QdrantManager } from '../memory/qdrant-manager.js';
import { initQdrantClient } from '../memory/qdrant-client.js';
import { startScheduler } from '../schedule/scheduler.js';
import { RuntimeHttpServer } from '../runtime/http-server.js';
import { assistantEventHub } from '../runtime/assistant-event-hub.js';
import * as attachmentsStore from '../memory/attachments-store.js';
import { getHookManager } from '../hooks/manager.js';
import { installTemplates } from '../hooks/templates.js';
import { installCliLaunchers } from './install-cli-launchers.js';
Expand Down Expand Up @@ -260,6 +262,18 @@ export async function runDaemon(): Promise<void> {
interfacesDir: getInterfacesDir(),
approvalCopyGenerator: createApprovalCopyGenerator(),
approvalConversationGenerator: createApprovalConversationGenerator(),
sendMessageDeps: {
getOrCreateSession: (conversationId) =>
server.getSessionForMessages(conversationId),
assistantEventHub,
resolveAttachments: (attachmentIds) =>
attachmentsStore.getAttachmentsByIds(attachmentIds).map((a) => ({
id: a.id,
filename: a.originalFilename,
mimeType: a.mimeType,
data: a.dataBase64,
})),
},
});
try {
await runtimeHttp.start();
Expand Down
8 changes: 8 additions & 0 deletions assistant/src/daemon/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,14 @@ export class DaemonServer {
return { messageId };
}

/**
* Expose session lookup for the POST /v1/messages handler.
* The handler manages busy-state checking and queueing itself.
*/
async getSessionForMessages(conversationId: string): Promise<Session> {
return this.getOrCreateSession(conversationId, undefined, true);
}

createRunOrchestrator(): RunOrchestrator {
return new RunOrchestrator({
getOrCreateSession: (conversationId, transport) =>
Expand Down
5 changes: 5 additions & 0 deletions assistant/src/runtime/http-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ export type {
RuntimeAttachmentMetadata,
ApprovalCopyGenerator,
ApprovalConversationGenerator,
SendMessageDeps,
} from './http-types.js';

import type {
Expand All @@ -129,6 +130,7 @@ import type {
RuntimeHttpServerOptions,
ApprovalCopyGenerator,
ApprovalConversationGenerator,
SendMessageDeps,
} from './http-types.js';

const log = getLogger('runtime-http');
Expand Down Expand Up @@ -156,6 +158,7 @@ export class RuntimeHttpServer {
private sweepInProgress = false;
private pairingStore = new PairingStore();
private pairingBroadcast?: (msg: ServerMessage) => void;
private sendMessageDeps?: SendMessageDeps;

constructor(options: RuntimeHttpServerOptions = {}) {
this.port = options.port ?? DEFAULT_PORT;
Expand All @@ -167,6 +170,7 @@ export class RuntimeHttpServer {
this.approvalCopyGenerator = options.approvalCopyGenerator;
this.approvalConversationGenerator = options.approvalConversationGenerator;
this.interfacesDir = options.interfacesDir ?? null;
this.sendMessageDeps = options.sendMessageDeps;
}

/** The port the server is actually listening on (resolved after start). */
Expand Down Expand Up @@ -558,6 +562,7 @@ export class RuntimeHttpServer {
return await handleSendMessage(req, {
processMessage: this.processMessage,
persistAndProcessMessage: this.persistAndProcessMessage,
sendMessageDeps: this.sendMessageDeps,
});
}

Expand Down
Loading