diff --git a/assistant/openapi.yaml b/assistant/openapi.yaml index 7648c3f59be..0c4a03c7dc8 100644 --- a/assistant/openapi.yaml +++ b/assistant/openapi.yaml @@ -4616,8 +4616,10 @@ paths: properties: id: type: string + description: Assistant-minted internal conversation id. The authoritative identifier for the conversation. conversationKey: type: string + description: Echo of the optional external key supplied by the client (or the value the daemon minted when omitted). conversationType: type: string created: @@ -4636,14 +4638,15 @@ paths: type: object properties: conversationKey: + description: + Optional external key. Echoed back in the response. Non-vellum channels (Telegram, WhatsApp) use this to + scope to a logical channel thread; vellum-web clients can omit it and rely on the assistant-minted + `id`. type: string - description: Idempotency key for the conversation conversationType: description: Only standard conversations are created by this endpoint type: string const: standard - required: - - conversationKey additionalProperties: false /v1/conversations/{conversationId}/slack-channel/resolve: post: @@ -7651,12 +7654,20 @@ paths: "200": description: Successful response parameters: + - name: conversationId + in: query + required: false + schema: + type: string + description: Scope to a single conversation by its assistant-minted internal id. 404s if no such conversation exists. - name: conversationKey in: query required: false schema: type: string - description: Scope to a single conversation + description: + Scope to a single conversation by an external key (non-vellum channels) or the web idempotency key. + Materializes a row on first use. Ignored when conversationId is also provided. /v1/events/emit: post: operationId: events_emit_post diff --git a/assistant/src/__tests__/conversation-routes-disk-view.test.ts b/assistant/src/__tests__/conversation-routes-disk-view.test.ts index 28fdddc884b..8716ab1d5c9 100644 --- a/assistant/src/__tests__/conversation-routes-disk-view.test.ts +++ b/assistant/src/__tests__/conversation-routes-disk-view.test.ts @@ -498,6 +498,115 @@ describe("macOS browser backend fallback (no extension, no cdp-inspect)", () => }); }); +describe("POST /v1/messages — body.conversationId direct id lookup", () => { + // The handler accepts two scope inputs with distinct semantics: + // + // - `body.conversationId` is the assistant-minted internal id and is + // looked up directly. A missing row is a 404 — clients must obtain + // the id from a prior daemon response. + // - `body.conversationKey` is an external key (non-vellum channels / + // web idempotency); resolved via the conversation_keys table and + // materialised on first use. + // + // When both are sent, `conversationId` wins and `conversationKey` is + // ignored. (Don't combine — fetch by one and then the other.) + + async function sendMessage( + body: Record, + successStatus = 202, + ): Promise { + return callHandler( + (args) => + handleSendMessage(args, { + sendMessageDeps: { + getOrCreateConversation: async (conversationId: string) => + getOrCreateFakeConversation(conversationId), + assistantEventHub: new AssistantEventHub(), + resolveAttachments: () => [], + }, + }), + new Request("http://localhost/v1/messages", { + method: "POST", + headers: { + "Content-Type": "application/json", + "x-vellum-principal-type": authContext.principalType, + }, + body: JSON.stringify(body), + }), + undefined, + successStatus, + ); + } + + test("body.conversationId= scopes the send to that conversation", async () => { + // Pre-materialise a conversation via the key path, then send a message + // by its assistant-minted internal id. + const externalKey = `pre-materialised-${crypto.randomUUID()}`; + const seeded = getOrCreateConversationMapping(externalKey); + + const response = await sendMessage({ + conversationId: seeded.conversationId, + content: "Direct id lookup — should reuse the existing conversation.", + sourceChannel: "vellum", + interface: "macos", + }); + + expect(response.status).toBe(202); + const body = (await response.json()) as { + accepted: boolean; + conversationId: string; + }; + expect(body.accepted).toBe(true); + expect(body.conversationId).toBe(seeded.conversationId); + + // No new external-key row should be materialised under the internal id. + expect(getConversationByKey(seeded.conversationId)).toBeNull(); + }); + + test("body.conversationId= returns 404", async () => { + const response = await sendMessage( + { + conversationId: `does-not-exist-${crypto.randomUUID()}`, + content: "Should 404 — unknown internal id.", + sourceChannel: "vellum", + interface: "macos", + }, + 404, + ); + expect(response.status).toBe(404); + const body = (await response.json()) as { + error?: { code?: string; message?: string }; + }; + expect(body.error?.code).toBe("NOT_FOUND"); + expect(body.error?.message).toMatch(/not found/i); + }); + + test("body.conversationId is honored and body.conversationKey is ignored when both are sent", async () => { + // Seed a conversation for the id we'll send. Also seed a separate + // conversation under a key the client will pass alongside — but the + // handler must scope to the id, NOT the key. + const idSeed = getOrCreateConversationMapping( + `id-honored-${crypto.randomUUID()}`, + ); + const keyValue = `key-ignored-${crypto.randomUUID()}`; + const keySeed = getOrCreateConversationMapping(keyValue); + expect(idSeed.conversationId).not.toBe(keySeed.conversationId); + + const response = await sendMessage({ + conversationId: idSeed.conversationId, + conversationKey: keyValue, + content: "Both fields sent — id should win.", + sourceChannel: "vellum", + interface: "macos", + }); + + expect(response.status).toBe(202); + const body = (await response.json()) as { conversationId: string }; + expect(body.conversationId).toBe(idSeed.conversationId); + expect(body.conversationId).not.toBe(keySeed.conversationId); + }); +}); + describe("conversationKey send path disk-view regression", () => { test("first send on a fresh conversationKey creates disk-view dir and writes user+assistant records", async () => { const conversationKey = `fresh-conv-key-${crypto.randomUUID()}`; diff --git a/assistant/src/__tests__/runtime-events-sse-bilingual.test.ts b/assistant/src/__tests__/runtime-events-sse-bilingual.test.ts new file mode 100644 index 00000000000..5e2a6714bd5 --- /dev/null +++ b/assistant/src/__tests__/runtime-events-sse-bilingual.test.ts @@ -0,0 +1,154 @@ +/** + * `GET /v1/events` (`handleSubscribeAssistantEvents`) — bilingual scope + * resolution. Two query params are accepted, with distinct semantics: + * + * - `?conversationId=` — looks up the conversation row + * directly by its assistant-minted id. 404 if not found. Does NOT + * materialise a new row. + * - `?conversationKey=` — resolves via the + * `conversation_keys` table; materialises on first use. Ignored when + * `conversationId` is also supplied. + * + * Companion to `runtime-events-sse.test.ts`, which exercises the broader + * `?conversationKey=` happy/error path. + */ + +import { beforeEach, describe, expect, mock, test } from "bun:test"; + +mock.module("../util/logger.js", () => ({ + getLogger: () => + new Proxy({} as Record, { + get: () => () => {}, + }), +})); + +mock.module("../config/loader.js", () => ({ + getConfig: () => ({ + ui: {}, + model: "test", + provider: "test", + memory: { enabled: false }, + rateLimit: { maxRequestsPerMinute: 0 }, + secretDetection: { enabled: false }, + }), +})); + +import { getOrCreateConversation } from "../memory/conversation-key-store.js"; +import { getDb } from "../memory/db-connection.js"; +import { initializeDb } from "../memory/db-init.js"; +import { buildAssistantEvent } from "../runtime/assistant-event.js"; +import { AssistantEventHub } from "../runtime/assistant-event-hub.js"; +import { + BadRequestError, + NotFoundError, +} from "../runtime/routes/errors.js"; +import { handleSubscribeAssistantEvents } from "../runtime/routes/events-routes.js"; + +initializeDb(); + +describe("GET /v1/events — bilingual scope query params", () => { + beforeEach(() => { + const db = getDb(); + db.run("DELETE FROM conversation_keys"); + db.run("DELETE FROM conversations"); + }); + + test("?conversationId= scopes the stream to that conversation", async () => { + // Materialise a conversation via the key path, then subscribe to it + // directly by its internal id. + const { conversationId } = getOrCreateConversation("sse-id-scope-source"); + + const ac = new AbortController(); + const testHub = new AssistantEventHub(); + + const stream = handleSubscribeAssistantEvents( + { + queryParams: { conversationId }, + abortSignal: ac.signal, + }, + { hub: testHub }, + ); + + const reader = stream.getReader(); + // Consume the initial heartbeat. + const heartbeat = await reader.read(); + expect(new TextDecoder().decode(heartbeat.value)).toBe(": heartbeat\n\n"); + + // Publish an event scoped to that conversation — should be delivered. + await testHub.publish(buildAssistantEvent({ type: "pong" }, conversationId)); + + const { value, done } = await reader.read(); + ac.abort(); + + expect(done).toBe(false); + const frame = new TextDecoder().decode(value); + expect(frame).toContain("event: assistant_event"); + expect(frame).toContain(`"conversationId":"${conversationId}"`); + }); + + test("?conversationId= throws NotFoundError", () => { + expect(() => + handleSubscribeAssistantEvents({ + queryParams: { conversationId: "does-not-exist" }, + abortSignal: new AbortController().signal, + }), + ).toThrow(NotFoundError); + }); + + test("?conversationId is honored and ?conversationKey is ignored when both are present", async () => { + // Materialise two distinct conversations: one we'll subscribe to by id, + // one we'll publish to via the ignored key. + const { conversationId: idConv } = getOrCreateConversation("sse-id-wins"); + const { conversationId: keyConv } = getOrCreateConversation( + "sse-key-ignored", + ); + expect(idConv).not.toBe(keyConv); + + const ac = new AbortController(); + const testHub = new AssistantEventHub(); + + const stream = handleSubscribeAssistantEvents( + { + queryParams: { + conversationId: idConv, + conversationKey: "sse-key-ignored", + }, + abortSignal: ac.signal, + }, + { hub: testHub }, + ); + const reader = stream.getReader(); + await reader.read(); // heartbeat + + // Publish on the "key" conversation — should NOT be delivered (filter + // is locked to idConv because conversationId wins). + await testHub.publish(buildAssistantEvent({ type: "pong" }, keyConv)); + // Publish on the "id" conversation — should be delivered. + await testHub.publish(buildAssistantEvent({ type: "pong" }, idConv)); + + const { value } = await reader.read(); + ac.abort(); + const frame = new TextDecoder().decode(value); + + expect(frame).toContain(`"conversationId":"${idConv}"`); + expect(frame).not.toContain(`"conversationId":"${keyConv}"`); + }); + + test("empty conversationId is rejected with BadRequestError", () => { + expect(() => + handleSubscribeAssistantEvents({ + queryParams: { conversationId: "" }, + abortSignal: new AbortController().signal, + }), + ).toThrow(BadRequestError); + }); + + test("empty conversationKey is still rejected (legacy parity)", () => { + expect(() => + handleSubscribeAssistantEvents({ + queryParams: { conversationKey: "" }, + abortSignal: new AbortController().signal, + }), + ).toThrow(BadRequestError); + }); +}); diff --git a/assistant/src/runtime/routes/conversation-management-routes.ts b/assistant/src/runtime/routes/conversation-management-routes.ts index ffb313ce8fa..fc2cc3a27bc 100644 --- a/assistant/src/runtime/routes/conversation-management-routes.ts +++ b/assistant/src/runtime/routes/conversation-management-routes.ts @@ -106,6 +106,13 @@ function handleCreateConversation({ body = {}, headers }: RouteHandlerArgs) { }, "Created conversation via POST", ); + // `id` is the assistant-minted internal `conversations.id` — the + // authoritative identifier for this conversation. `conversationKey` + // echoes the optional external key supplied by the client (or the + // UUID we minted) and is the identifier non-vellum channel adapters + // (Telegram, WhatsApp, etc.) use to scope to a logical channel + // thread. Vellum-web clients can ignore `conversationKey` and use + // `id` directly. return { id: result.conversationId, conversationKey, @@ -428,15 +435,26 @@ export const ROUTES: RouteDefinition[] = [ requestBody: z.object({ conversationKey: z .string() - .describe("Idempotency key for the conversation"), + .optional() + .describe( + "Optional external key. Echoed back in the response. Non-vellum channels (Telegram, WhatsApp) use this to scope to a logical channel thread; vellum-web clients can omit it and rely on the assistant-minted `id`.", + ), conversationType: z .literal("standard") .optional() .describe("Only standard conversations are created by this endpoint"), }), responseBody: z.object({ - id: z.string(), - conversationKey: z.string(), + id: z + .string() + .describe( + "Assistant-minted internal conversation id. The authoritative identifier for the conversation.", + ), + conversationKey: z + .string() + .describe( + "Echo of the optional external key supplied by the client (or the value the daemon minted when omitted).", + ), conversationType: z.string(), created: z.boolean(), }), diff --git a/assistant/src/runtime/routes/conversation-routes.ts b/assistant/src/runtime/routes/conversation-routes.ts index 3a738da00a4..d56631a22d8 100644 --- a/assistant/src/runtime/routes/conversation-routes.ts +++ b/assistant/src/runtime/routes/conversation-routes.ts @@ -121,7 +121,12 @@ import { resolveTrustContext, withSourceChannel, } from "../trust-context-resolver.js"; -import { BadRequestError, InternalError, RouteError } from "./errors.js"; +import { + BadRequestError, + InternalError, + NotFoundError, + RouteError, +} from "./errors.js"; import type { RouteDefinition, RouteHandlerArgs } from "./types.js"; import { RouteResponse } from "./types.js"; @@ -1223,6 +1228,7 @@ export async function handleSendMessage( ): Promise { const body = (rawBody ?? {}) as { conversationKey?: string; + conversationId?: string; content?: string; attachmentIds?: string[]; sourceChannel?: string; @@ -1258,6 +1264,10 @@ export async function handleSendMessage( headers?.["x-vellum-client-id"]?.trim() || undefined; const { conversationKey, content, attachmentIds } = body; + const inboundConversationId = + typeof body.conversationId === "string" && body.conversationId.length > 0 + ? body.conversationId + : undefined; const clientMessageId = typeof body.clientMessageId === "string" ? body.clientMessageId : undefined; const requestedInferenceProfile = @@ -1325,12 +1335,6 @@ export async function handleSendMessage( ? (canonicalizeTimeZone(body.clientTimezone) ?? undefined) : undefined; - // When conversationKey is omitted, derive a stable default from - // sourceChannel + sourceInterface so that repeated calls from the same - // channel/interface pair share a single conversation thread. - const resolvedConversationKey = - conversationKey ?? `default:${sourceChannel}:${sourceInterface}`; - // Reject non-string content values (numbers, objects, etc.) if (content != null && typeof content !== "string") { throw new BadRequestError("content must be a string"); @@ -1394,9 +1398,40 @@ export async function handleSendMessage( // timer so the next heartbeat is a full interval after this interaction. HeartbeatService.getInstance()?.resetTimer(); - const mapping = getOrCreateConversation(resolvedConversationKey, { - conversationType: "standard", - }); + // Resolve the target conversation. Fetch by `conversationId` (the + // assistant-minted internal id) when the client supplies it — clients + // must obtain this id from a prior daemon response, so a missing row + // is a 404. Otherwise fall through to the external-key path: the + // client-supplied `conversationKey` (used by non-vellum channels and + // the web idempotency flow) or, when neither is provided, a stable + // default keyed on sourceChannel + sourceInterface so repeated calls + // from the same channel/interface share a single thread. + let mapping: { + conversationId: string; + conversationType: string; + created: boolean; + }; + if (inboundConversationId !== undefined) { + const existing = getConversation(inboundConversationId); + if (!existing) { + throw new NotFoundError( + `Conversation ${inboundConversationId} not found`, + ); + } + mapping = { + conversationId: existing.id, + conversationType: existing.conversationType, + created: false, + }; + } else { + const resolvedConversationKey = + conversationKey && conversationKey.length > 0 + ? conversationKey + : `default:${sourceChannel}:${sourceInterface}`; + mapping = getOrCreateConversation(resolvedConversationKey, { + conversationType: "standard", + }); + } if (requestedRiskThreshold !== undefined) { const result = await ipcCall("set_conversation_threshold", { diff --git a/assistant/src/runtime/routes/events-routes.ts b/assistant/src/runtime/routes/events-routes.ts index 1e3986673fe..4a2fec318a7 100644 --- a/assistant/src/runtime/routes/events-routes.ts +++ b/assistant/src/runtime/routes/events-routes.ts @@ -26,6 +26,7 @@ import { z } from "zod"; import type { HostProxyCapability } from "../../channels/types.js"; import { parseInterfaceId, supportsHostProxy } from "../../channels/types.js"; import { emitContactChange } from "../../contacts/contact-events.js"; +import { getConversation } from "../../memory/conversation-crud.js"; import { getOrCreateConversation } from "../../memory/conversation-key-store.js"; import { getLogger } from "../../util/logger.js"; import { formatSseFrame, formatSseHeartbeat } from "../assistant-event.js"; @@ -39,7 +40,11 @@ import { assistantEventHub, } from "../assistant-event-hub.js"; import { resolveActorPrincipalIdForLocalGuardian } from "../local-actor-identity.js"; -import { BadRequestError, ServiceUnavailableError } from "./errors.js"; +import { + BadRequestError, + NotFoundError, + ServiceUnavailableError, +} from "./errors.js"; import type { RouteDefinition, RouteHandlerArgs } from "./types.js"; const log = getLogger("events-routes"); @@ -220,9 +225,17 @@ const defaultSseShedReporter: SseShedReporter = (reason, inst) => { * Stream assistant events as Server-Sent Events. * * Query params: - * conversationKey -- optional; when provided, scopes the stream to one - * conversation. When omitted, the stream delivers events - * from ALL conversations for this assistant. + * conversationId -- optional; assistant-minted internal conversation id. + * When provided, the stream is scoped to that one + * conversation; the daemon 404s if no such conversation + * exists (clients must obtain the id from a prior + * response). + * conversationKey -- optional; external key (non-vellum channels) or the + * web idempotency key. Resolved via the conversation + * keys table; materializes a row on first use. + * Ignored when `conversationId` is also provided. + * When both are omitted, the stream delivers events from ALL + * conversations for this assistant. * * Headers (optional): * X-Vellum-Client-Id -- stable per-install UUID identifying this client. @@ -248,8 +261,18 @@ export function handleSubscribeAssistantEvents( ): ReadableStream { const { queryParams, headers, abortSignal } = args; - const conversationKey = queryParams?.conversationKey; - if ("conversationKey" in (queryParams ?? {}) && !conversationKey?.trim()) { + const rawConversationId = queryParams?.conversationId; + const rawConversationKey = queryParams?.conversationKey; + if ( + "conversationId" in (queryParams ?? {}) && + !rawConversationId?.trim() + ) { + throw new BadRequestError("conversationId must not be empty"); + } + if ( + "conversationKey" in (queryParams ?? {}) && + !rawConversationKey?.trim() + ) { throw new BadRequestError("conversationKey must not be empty"); } @@ -293,10 +316,25 @@ export function handleSubscribeAssistantEvents( "host_browser", ]; + // Resolve the scope. `conversationId` (when supplied) is the + // assistant-minted internal id — looked up directly; 404 if absent. + // Otherwise fall through to `conversationKey`, which is treated as an + // external key and resolved via the conversation_keys table + // (materialized on first use, preserving the existing subscribe-time + // create behavior for the web idempotency flow). const filter: AssistantEventFilter = {}; - if (conversationKey) { - const mapping = getOrCreateConversation(conversationKey); + let scopeConversationKey: string | null = null; + if (rawConversationId) { + const existing = getConversation(rawConversationId); + if (!existing) { + throw new NotFoundError(`Conversation ${rawConversationId} not found`); + } + filter.conversationId = existing.id; + scopeConversationKey = existing.id; + } else if (rawConversationKey) { + const mapping = getOrCreateConversation(rawConversationKey); filter.conversationId = mapping.conversationId; + scopeConversationKey = rawConversationKey; } const encoder = new TextEncoder(); @@ -316,7 +354,7 @@ export function handleSubscribeAssistantEvents( heartbeatsSent: 0, clientId, interfaceId, - conversationKey: conversationKey ?? null, + conversationKey: scopeConversationKey, }; ensureEventLoopDelayMonitorStarted(); @@ -470,9 +508,15 @@ export const ROUTES: RouteDefinition[] = [ description: "Stream assistant events as Server-Sent Events (SSE).", tags: ["events"], queryParams: [ + { + name: "conversationId", + description: + "Scope to a single conversation by its assistant-minted internal id. 404s if no such conversation exists.", + }, { name: "conversationKey", - description: "Scope to a single conversation", + description: + "Scope to a single conversation by an external key (non-vellum channels) or the web idempotency key. Materializes a row on first use. Ignored when conversationId is also provided.", }, ], responseHeaders: {