Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions apps/web/src/domains/chat/api/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
type PreChatOnboardingContext,
} from "@/domains/onboarding/prechat.js";
import { persistPreChatOnboardingProfile } from "@/domains/onboarding/prechat-profile.js";
import { pickConversationIdWireField } from "@/lib/backwards-compat/conversation-id-wire-field.js";

const POLL_INTERVAL_MS = 1000;
const POLL_TIMEOUT_MS = 120_000;
Expand Down Expand Up @@ -485,11 +486,12 @@ export async function postChatMessage(
attachmentIds: string[] = [],
onboarding?: PreChatOnboardingContext,
): Promise<PostMessageResult> {
// Daemon 0.8.5+ accepts `conversationId` on this endpoint as a direct
// internal-id lookup; older daemons only understand the legacy
// `conversationKey` (external-key path). The gate that picks between
// them lives in `lib/backwards-compat/conversation-id-wire-field.ts`.
const body: Record<string, unknown> = {
// Daemon's send-message endpoint reads `body.conversationKey` only
// (see assistant/src/runtime/routes/conversation-routes.ts handleSendMessage).
// The web-side parameter is conversationId; map to the wire field here.
conversationKey: conversationId,
[pickConversationIdWireField()]: conversationId,
content,
sourceChannel: "vellum",
interface: "vellum",
Expand Down
96 changes: 96 additions & 0 deletions apps/web/src/domains/chat/api/post-chat-message.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test";

import { postChatMessage } from "@/domains/chat/api/messages.js";
import { useAssistantIdentityStore } from "@/stores/assistant-identity-store.js";

describe("postChatMessage onboarding payload", () => {
let originalFetch: typeof fetch;
Expand All @@ -10,6 +11,11 @@ describe("postChatMessage onboarding payload", () => {
beforeEach(() => {
originalFetch = globalThis.fetch;
capturedRequests = [];
// Reset the assistant-identity store so version-gated wire-field
// selection in `postChatMessage` defaults to the conservative legacy
// `conversationKey` path. Individual tests that exercise the new
// `conversationId` path opt in explicitly via `setIdentity(...)`.
useAssistantIdentityStore.getState().clearIdentity();
// The vellum-api client request interceptor calls ensureCsrfCookie() on
// mutating requests, which reads `document.cookie`. Stub a minimal
// `document` so the bun test (Node) environment doesn't throw.
Expand Down Expand Up @@ -177,3 +183,93 @@ describe("postChatMessage onboarding payload", () => {
expect(onboarding).not.toHaveProperty("assistantName");
});
});

describe("postChatMessage wire-field bilingual cutover", () => {
// Verifies the daemon-version gate selects the right wire field on
// `POST /v1/messages`. See `apps/web/src/assistant/version-compat.ts`
// for the cutover policy (legacy `conversationKey` for daemons older
// than 0.8.5, canonical `conversationId` for 0.8.5+).
let originalFetch: typeof fetch;
let originalDocument: unknown;
let capturedRequests: Array<{ url: string; body: string }> = [];

beforeEach(() => {
originalFetch = globalThis.fetch;
capturedRequests = [];
useAssistantIdentityStore.getState().clearIdentity();
originalDocument = (globalThis as { document?: unknown }).document;
(globalThis as { document?: unknown }).document = { cookie: "csrftoken=test" };
globalThis.fetch = mock(async (input: RequestInfo | URL, init?: RequestInit) => {
const url = input instanceof Request ? input.url : String(input);
let bodyText: string | undefined;
if (input instanceof Request) {
bodyText = await input.clone().text();
} else if (typeof init?.body === "string") {
bodyText = init.body;
}
capturedRequests.push({ url, body: bodyText ?? "" });
return new Response(
JSON.stringify({ accepted: true, messageId: "msg-1" }),
{ status: 200, headers: { "Content-Type": "application/json" } },
);
}) as unknown as typeof fetch;
});

afterEach(() => {
globalThis.fetch = originalFetch;
if (originalDocument === undefined) {
delete (globalThis as { document?: unknown }).document;
} else {
(globalThis as { document?: unknown }).document = originalDocument;
}
useAssistantIdentityStore.getState().clearIdentity();
});

function getMessageBody(): Record<string, unknown> {
const requests = capturedRequests.filter((r) => r.url.includes("/messages/"));
expect(requests).toHaveLength(1);
return JSON.parse(requests[0]!.body) as Record<string, unknown>;
}

test("uses conversationId wire field when daemon version >= 0.8.5", async () => {
useAssistantIdentityStore.getState().setIdentity("Vel", "0.8.5");

await postChatMessage("asst-1", "conv-internal-1", "hi");

const body = getMessageBody();
expect(body.conversationId).toBe("conv-internal-1");
expect(body).not.toHaveProperty("conversationKey");
});

test("uses conversationId wire field for newer daemons (e.g. 0.9.0)", async () => {
useAssistantIdentityStore.getState().setIdentity("Vel", "0.9.0");

await postChatMessage("asst-1", "conv-internal-2", "hi");

const body = getMessageBody();
expect(body.conversationId).toBe("conv-internal-2");
expect(body).not.toHaveProperty("conversationKey");
});

test("uses conversationKey wire field for daemons older than 0.8.5", async () => {
useAssistantIdentityStore.getState().setIdentity("Vel", "0.8.4");

await postChatMessage("asst-1", "conv-internal-3", "hi");

const body = getMessageBody();
expect(body.conversationKey).toBe("conv-internal-3");
expect(body).not.toHaveProperty("conversationId");
});

test("falls back to conversationKey when daemon version is unknown (identity not yet hydrated)", async () => {
// Default store state: version === null. This is the window
// between page load and the identity fetch resolving.
expect(useAssistantIdentityStore.getState().version).toBeNull();

await postChatMessage("asst-1", "conv-internal-4", "hi");

const body = getMessageBody();
expect(body.conversationKey).toBe("conv-internal-4");
expect(body).not.toHaveProperty("conversationId");
});
});
125 changes: 124 additions & 1 deletion apps/web/src/domains/chat/api/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import {
} from "@/domains/messaging/turn-store.js";
import { parseAssistantEvent } from "@/domains/chat/api/event-parser.js";
import { subscribeChatEvents, type ChatStreamReconnectCause } from "@/domains/chat/api/stream.js";
import { useAssistantIdentityStore } from "@/stores/assistant-identity-store.js";

describe("polling reconciliation with state machine", () => {
/**
Expand Down Expand Up @@ -305,6 +306,10 @@ describe("subscribeChatEvents idle watchdog", () => {
// path but keeps the bun (Node) test env consistent.
originalDocument = (globalThis as { document?: unknown }).document;
(globalThis as { document?: unknown }).document = { cookie: "csrftoken=test" };
// Reset the version-gating store so subscribeChatEvents defaults to
// the legacy `conversationKey` wire field. Tests that exercise the
// newer `conversationId` path opt in explicitly via setIdentity().
useAssistantIdentityStore.getState().clearIdentity();
});

afterEach(() => {
Expand All @@ -314,9 +319,10 @@ describe("subscribeChatEvents idle watchdog", () => {
} else {
(globalThis as { document?: unknown }).document = originalDocument;
}
useAssistantIdentityStore.getState().clearIdentity();
});

test("omits conversationKey query when subscribing to all assistant events", async () => {
test("omits any conversation query param when subscribing to all assistant events", async () => {
const requestedUrls: string[] = [];
globalThis.fetch = mock(
async (input: RequestInfo | URL) => {
Expand Down Expand Up @@ -344,7 +350,124 @@ describe("subscribeChatEvents idle watchdog", () => {
await new Promise((r) => setTimeout(r, 50));
expect(requestedUrls).toHaveLength(1);
expect(requestedUrls[0]).toContain("/v1/assistants/asst-1/events/");
// Neither the legacy nor the canonical wire field should appear when
// subscribing to all assistant events (no conversation filter).
expect(requestedUrls[0]).not.toContain("conversationKey");
expect(requestedUrls[0]).not.toContain("conversationId");
} finally {
stream.cancel();
}
});

test("uses conversationId query when daemon version >= 0.8.5", async () => {
useAssistantIdentityStore.getState().setIdentity("Vel", "0.8.5");

const requestedUrls: string[] = [];
globalThis.fetch = mock(
async (input: RequestInfo | URL) => {
requestedUrls.push(input instanceof Request ? input.url : String(input));
return new Response(
new ReadableStream({
start(controller) {
controller.close();
},
}),
{ status: 200, headers: { "Content-Type": "text/event-stream" } },
);
},
) as unknown as typeof fetch;

const stream = subscribeChatEvents(
"asst-1",
"conv-internal-1",
() => {},
() => {},
{ idleTimeoutMs: 5_000, reconnectBaseDelayMs: 10_000 },
);

try {
await new Promise((r) => setTimeout(r, 50));
expect(requestedUrls).toHaveLength(1);
const url = new URL(requestedUrls[0]!);
expect(url.searchParams.get("conversationId")).toBe("conv-internal-1");
expect(url.searchParams.get("conversationKey")).toBeNull();
} finally {
stream.cancel();
}
});

test("uses conversationKey query when daemon version is older than 0.8.5", async () => {
useAssistantIdentityStore.getState().setIdentity("Vel", "0.8.4");

const requestedUrls: string[] = [];
globalThis.fetch = mock(
async (input: RequestInfo | URL) => {
requestedUrls.push(input instanceof Request ? input.url : String(input));
return new Response(
new ReadableStream({
start(controller) {
controller.close();
},
}),
{ status: 200, headers: { "Content-Type": "text/event-stream" } },
);
},
) as unknown as typeof fetch;

const stream = subscribeChatEvents(
"asst-1",
"conv-key-legacy",
() => {},
() => {},
{ idleTimeoutMs: 5_000, reconnectBaseDelayMs: 10_000 },
);

try {
await new Promise((r) => setTimeout(r, 50));
expect(requestedUrls).toHaveLength(1);
const url = new URL(requestedUrls[0]!);
expect(url.searchParams.get("conversationKey")).toBe("conv-key-legacy");
expect(url.searchParams.get("conversationId")).toBeNull();
} finally {
stream.cancel();
}
});

test("falls back to conversationKey when daemon version is unknown", async () => {
// Default store state: identity not yet hydrated. Conservative
// fallback to the legacy field is the only safe choice — older
// daemons silently ignore `conversationId`.
expect(useAssistantIdentityStore.getState().version).toBeNull();

const requestedUrls: string[] = [];
globalThis.fetch = mock(
async (input: RequestInfo | URL) => {
requestedUrls.push(input instanceof Request ? input.url : String(input));
return new Response(
new ReadableStream({
start(controller) {
controller.close();
},
}),
{ status: 200, headers: { "Content-Type": "text/event-stream" } },
);
},
) as unknown as typeof fetch;

const stream = subscribeChatEvents(
"asst-1",
"conv-unknown-version",
() => {},
() => {},
{ idleTimeoutMs: 5_000, reconnectBaseDelayMs: 10_000 },
);

try {
await new Promise((r) => setTimeout(r, 50));
expect(requestedUrls).toHaveLength(1);
const url = new URL(requestedUrls[0]!);
expect(url.searchParams.get("conversationKey")).toBe("conv-unknown-version");
expect(url.searchParams.get("conversationId")).toBeNull();
} finally {
stream.cancel();
}
Expand Down
15 changes: 11 additions & 4 deletions apps/web/src/domains/chat/api/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { client, SDK_BASE_OPTIONS } from "@/domains/chat/api/client.js";
import { recordChatDiagnostic, resolvePlatformTag } from "@/domains/chat/utils/diagnostics.js";
import { parseAssistantEvent, readEventConversationId } from "@/domains/chat/api/event-parser.js";
import type { AssistantEvent } from "@/domains/chat/api/event-types.js";
import { pickConversationIdWireField } from "@/lib/backwards-compat/conversation-id-wire-field.js";
import { getClientRegistrationHeaders } from "@/lib/telemetry/client-identity.js";
import {
markClientEstablished,
Expand Down Expand Up @@ -308,15 +309,21 @@ export function subscribeChatEvents(
dataFramesReceivedSinceConnect = 0;
let streamError: Error | null = null;
try {
// Daemon 0.8.5+ accepts `conversationId` on this endpoint as a
// direct internal-id lookup; older daemons only understand the
// legacy `conversationKey` (external-key path). The gate that
// picks between them lives in
// `lib/backwards-compat/conversation-id-wire-field.ts`.
const { stream } = await client.sse.get<Record<string, unknown> | string>({
...SDK_BASE_OPTIONS,
url: "/v1/assistants/{assistant_id}/events/",
path: { assistant_id: assistantId },
// SSE endpoint `GET /v1/assistants/{id}/events/` accepts only
// `conversationKey` as its query param (see events-routes.ts).
// Map the internal conversationId variable onto the wire field.
...(requestedConversationId
? { query: { conversationKey: requestedConversationId } }
? {
query: {
[pickConversationIdWireField()]: requestedConversationId,
},
}
: {}),
headers: {
Accept: "text/event-stream, application/json",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";

import { pickConversationIdWireField } from "@/lib/backwards-compat/conversation-id-wire-field.js";
import { useAssistantIdentityStore } from "@/stores/assistant-identity-store.js";

function setVersion(version: string | null) {
useAssistantIdentityStore.getState().setIdentity("test-asst", version);
}

beforeEach(() => {
useAssistantIdentityStore.getState().clearIdentity();
});

afterEach(() => {
useAssistantIdentityStore.getState().clearIdentity();
});

// Exhaustive truth-table for the underlying semver gate lives in
// `utils.test.ts` (covers null/empty, unparseable, pre-release, `v`
// prefix, etc). Here we verify the wire-field branch on each side
// of the 0.8.5 boundary plus the conservative-on-unknown policy.
describe("pickConversationIdWireField", () => {
test("returns conversationKey when version is unknown", () => {
setVersion(null);
expect(pickConversationIdWireField()).toBe("conversationKey");
});

test("returns conversationKey for assistants on 0.8.4 and older", () => {
setVersion("0.8.4");
expect(pickConversationIdWireField()).toBe("conversationKey");
setVersion("0.7.0");
expect(pickConversationIdWireField()).toBe("conversationKey");
});

test("returns conversationId for assistants on 0.8.5+", () => {
setVersion("0.8.5");
expect(pickConversationIdWireField()).toBe("conversationId");
setVersion("0.8.6");
expect(pickConversationIdWireField()).toBe("conversationId");
setVersion("0.9.0");
expect(pickConversationIdWireField()).toBe("conversationId");
setVersion("1.0.0");
expect(pickConversationIdWireField()).toBe("conversationId");
});

test("treats RC builds of the cutover patch as supporting the new field", () => {
// 0.8.5-rc.1 ships with the same bilingual handlers as 0.8.5,
// so RC testers must get the new wire field.
setVersion("0.8.5-rc.1");
expect(pickConversationIdWireField()).toBe("conversationId");
setVersion("0.8.5-beta");
expect(pickConversationIdWireField()).toBe("conversationId");
});

test("returns conversationKey for unparseable versions", () => {
setVersion("garbage");
expect(pickConversationIdWireField()).toBe("conversationKey");
setVersion("0.8");
expect(pickConversationIdWireField()).toBe("conversationKey");
});
});
Loading