diff --git a/assistant/openapi.yaml b/assistant/openapi.yaml index 3c7eb803986..f92c45d11fb 100644 --- a/assistant/openapi.yaml +++ b/assistant/openapi.yaml @@ -4210,13 +4210,17 @@ paths: type: array items: {} description: Array of message objects - interfaceFiles: - type: array - items: {} - description: Interface file paths with modification timestamps + hasMore: + description: Whether older messages exist beyond this page + type: boolean + oldestTimestamp: + description: Timestamp of the oldest message in this page (ms since epoch) + type: number + oldestMessageId: + description: ID of the oldest message in this page + type: string required: - messages - - interfaceFiles additionalProperties: false post: operationId: messages_post diff --git a/assistant/src/__tests__/list-messages-attachments.test.ts b/assistant/src/__tests__/list-messages-attachments.test.ts index 0da5fa1a37a..013f1375a9f 100644 --- a/assistant/src/__tests__/list-messages-attachments.test.ts +++ b/assistant/src/__tests__/list-messages-attachments.test.ts @@ -286,3 +286,174 @@ describe("handleListMessages no_response filtering", () => { expect(body.messages[0].content).toBe("What does do?"); }); }); + +// --------------------------------------------------------------------------- +// Pagination +// --------------------------------------------------------------------------- + +interface PaginatedResponse { + messages: { id: string; content: string; timestamp: string }[]; + hasMore?: boolean; + oldestTimestamp?: number; + oldestMessageId?: string; +} + +function createPaginatedUrl( + conversationId: string, + params?: { limit?: string; beforeTimestamp?: string }, +): URL { + const url = new URL( + `http://localhost/v1/messages?conversationId=${conversationId}`, + ); + if (params?.limit !== undefined) url.searchParams.set("limit", params.limit); + if (params?.beforeTimestamp !== undefined) + url.searchParams.set("beforeTimestamp", params.beforeTimestamp); + return url; +} + +/** Helper: insert N messages with distinct, increasing timestamps and return them in insertion order. */ +async function insertMessages( + conversationId: string, + count: number, +): Promise<{ id: string; createdAt: number }[]> { + const msgs: { id: string; createdAt: number }[] = []; + for (let i = 0; i < count; i++) { + const msg = await addMessage( + conversationId, + i % 2 === 0 ? "user" : "assistant", + JSON.stringify([{ type: "text", text: `msg-${i}` }]), + ); + msgs.push({ id: msg.id, createdAt: msg.createdAt }); + } + return msgs; +} + +describe("handleListMessages pagination", () => { + beforeEach(resetTables); + + test("no params → all messages, no hasMore field", async () => { + const conv = createConversation(); + await insertMessages(conv.id, 5); + + const response = handleListMessages(createTestUrl(conv.id), null); + const body = (await response.json()) as PaginatedResponse; + + expect(body.messages).toHaveLength(5); + expect(body.hasMore).toBeUndefined(); + expect(body.oldestTimestamp).toBeUndefined(); + expect(body.oldestMessageId).toBeUndefined(); + }); + + test("limit only (no beforeTimestamp) → all messages, no hasMore", async () => { + const conv = createConversation(); + await insertMessages(conv.id, 5); + + const url = createPaginatedUrl(conv.id, { limit: "3" }); + const response = handleListMessages(url, null); + const body = (await response.json()) as PaginatedResponse; + + // Option A: without beforeTimestamp, all messages are returned regardless of limit + expect(body.messages).toHaveLength(5); + expect(body.hasMore).toBeUndefined(); + }); + + test("beforeTimestamp + limit → correct page with hasMore: true", async () => { + const conv = createConversation(); + const msgs = await insertMessages(conv.id, 10); + + // Cursor is message[7]'s timestamp; limit=3 → should return messages [4,5,6] + const url = createPaginatedUrl(conv.id, { + beforeTimestamp: String(msgs[7].createdAt), + limit: "3", + }); + const response = handleListMessages(url, null); + const body = (await response.json()) as PaginatedResponse; + + expect(body.messages).toHaveLength(3); + expect(body.messages.map((m) => m.id)).toEqual([ + msgs[4].id, + msgs[5].id, + msgs[6].id, + ]); + expect(body.hasMore).toBe(true); + }); + + test("beforeTimestamp is strictly exclusive", async () => { + const conv = createConversation(); + const msgs = await insertMessages(conv.id, 3); + + // Use message[1]'s exact timestamp as cursor — message[1] should NOT appear + const url = createPaginatedUrl(conv.id, { + beforeTimestamp: String(msgs[1].createdAt), + limit: "10", + }); + const response = handleListMessages(url, null); + const body = (await response.json()) as PaginatedResponse; + + const ids = body.messages.map((m) => m.id); + expect(ids).toContain(msgs[0].id); + expect(ids).not.toContain(msgs[1].id); + expect(ids).not.toContain(msgs[2].id); + }); + + test("hasMore: false when all older messages fit", async () => { + const conv = createConversation(); + const msgs = await insertMessages(conv.id, 5); + + // beforeTimestamp beyond the last message, limit larger than total count + const url = createPaginatedUrl(conv.id, { + beforeTimestamp: String(msgs[4].createdAt + 1), + limit: "10", + }); + const response = handleListMessages(url, null); + const body = (await response.json()) as PaginatedResponse; + + expect(body.messages).toHaveLength(5); + expect(body.hasMore).toBe(false); + }); + + test("oldestTimestamp and oldestMessageId match oldest returned message", async () => { + const conv = createConversation(); + const msgs = await insertMessages(conv.id, 5); + + // Fetch last 3 messages before a cursor past the end + const url = createPaginatedUrl(conv.id, { + beforeTimestamp: String(msgs[4].createdAt + 1), + limit: "3", + }); + const response = handleListMessages(url, null); + const body = (await response.json()) as PaginatedResponse; + + expect(body.messages).toHaveLength(3); + // Oldest returned message is msgs[2] (messages [2,3,4]) + expect(body.oldestTimestamp).toBe(msgs[2].createdAt); + expect(body.oldestMessageId).toBe(msgs[2].id); + }); + + test("empty / nonexistent conversation → empty messages, no pagination metadata", async () => { + const url = createPaginatedUrl("nonexistent-conv-id"); + const response = handleListMessages(url, null); + const body = (await response.json()) as PaginatedResponse; + + expect(body.messages).toEqual([]); + expect(body.hasMore).toBeUndefined(); + expect(body.oldestTimestamp).toBeUndefined(); + expect(body.oldestMessageId).toBeUndefined(); + }); + + test("invalid limit (NaN) → 400", async () => { + const conv = createConversation(); + const url = createPaginatedUrl(conv.id, { limit: "abc" }); + const response = handleListMessages(url, null); + + expect(response.status).toBe(400); + }); + + test("invalid beforeTimestamp (NaN) → 400", async () => { + const conv = createConversation(); + const url = createPaginatedUrl(conv.id, { beforeTimestamp: "abc" }); + const response = handleListMessages(url, null); + + expect(response.status).toBe(400); + }); +}); diff --git a/assistant/src/memory/conversation-crud.ts b/assistant/src/memory/conversation-crud.ts index 3a22596fb99..b119e915249 100644 --- a/assistant/src/memory/conversation-crud.ts +++ b/assistant/src/memory/conversation-crud.ts @@ -10,6 +10,7 @@ import { gte, inArray, isNull, + lt, lte, sql, } from "drizzle-orm"; @@ -1112,6 +1113,77 @@ export function getMessages(conversationId: string): MessageRow[] { .map(parseMessage); } +export interface PaginatedMessagesResult { + messages: MessageRow[]; + hasMore: boolean; +} + +export function getMessagesPaginated( + conversationId: string, + limit: number | undefined, + beforeTimestamp?: number, +): PaginatedMessagesResult { + const db = getDb(); + + if (limit === undefined) { + const conditions = [eq(messages.conversationId, conversationId)]; + if (beforeTimestamp !== undefined) { + conditions.push(lt(messages.createdAt, beforeTimestamp)); + } + const rows = db + .select() + .from(messages) + .where(and(...conditions)) + .orderBy(asc(messages.createdAt)) + .all() + .map(parseMessage); + return { messages: rows, hasMore: false }; + } + + const conditions = [eq(messages.conversationId, conversationId)]; + if (beforeTimestamp !== undefined) { + conditions.push(lt(messages.createdAt, beforeTimestamp)); + } + + const rows = db + .select() + .from(messages) + .where(and(...conditions)) + .orderBy(desc(messages.createdAt)) + .limit(limit + 1) + .all() + .map(parseMessage); + + const hasMore = rows.length > limit; + if (hasMore) { + rows.splice(limit); + } + rows.reverse(); + + return { messages: rows, hasMore }; +} + +export function getLastAssistantTimestampBefore( + conversationId: string, + beforeTimestamp: number, +): number { + const db = getDb(); + const row = db + .select({ createdAt: messages.createdAt }) + .from(messages) + .where( + and( + eq(messages.conversationId, conversationId), + eq(messages.role, "assistant"), + lt(messages.createdAt, beforeTimestamp), + ), + ) + .orderBy(desc(messages.createdAt)) + .limit(1) + .get(); + return row?.createdAt ?? 0; +} + /** Fetch a single message by ID, optionally scoped to a specific conversation. */ export function getMessageById( messageId: string, diff --git a/assistant/src/memory/db-init.ts b/assistant/src/memory/db-init.ts index 36a058d5be6..ad4b98efe45 100644 --- a/assistant/src/memory/db-init.ts +++ b/assistant/src/memory/db-init.ts @@ -89,6 +89,7 @@ import { migrateLlmRequestLogMessageId, migrateLlmRequestLogProvider, migrateMemoryItemSupersession, + migrateMessagesConversationCreatedAtIndex, migrateMessagesFtsBackfill, migrateNormalizePhoneIdentities, migrateNotificationDeliveryThreadDecision, @@ -520,6 +521,9 @@ export function initializeDb(): void { // 93. Strip `integration:` prefix from provider_key across OAuth tables migrateStripIntegrationPrefixFromProviderKeys(database); + // 94. Composite index on messages(conversation_id, created_at) for paginated history queries + migrateMessagesConversationCreatedAtIndex(database); + validateMigrationState(database); if (process.env.BUN_TEST === "1") { diff --git a/assistant/src/memory/migrations/196-messages-conversation-created-at-index.ts b/assistant/src/memory/migrations/196-messages-conversation-created-at-index.ts new file mode 100644 index 00000000000..ce91449e225 --- /dev/null +++ b/assistant/src/memory/migrations/196-messages-conversation-created-at-index.ts @@ -0,0 +1,9 @@ +import type { DrizzleDb } from "../db-connection.js"; + +export function migrateMessagesConversationCreatedAtIndex( + database: DrizzleDb, +): void { + database.run( + /*sql*/ `CREATE INDEX IF NOT EXISTS idx_messages_conversation_created_at ON messages(conversation_id, created_at)`, + ); +} diff --git a/assistant/src/memory/migrations/index.ts b/assistant/src/memory/migrations/index.ts index 637462ee28a..d5182f3e83c 100644 --- a/assistant/src/memory/migrations/index.ts +++ b/assistant/src/memory/migrations/index.ts @@ -134,6 +134,7 @@ export { migrateContactsUserFileColumn } from "./192-contacts-user-file-column.j export { migrateAddSourceTypeColumns } from "./193-add-source-type-columns.js"; export { migrateCreateMemoryRecallLogs } from "./194-memory-recall-logs.js"; export { migrateOAuthProvidersPingConfig } from "./195-oauth-providers-ping-config.js"; +export { migrateMessagesConversationCreatedAtIndex } from "./196-messages-conversation-created-at-index.js"; export { migrateStripIntegrationPrefixFromProviderKeys } from "./196-strip-integration-prefix-from-provider-keys.js"; export { MIGRATION_REGISTRY, diff --git a/assistant/src/runtime/routes/conversation-routes.ts b/assistant/src/runtime/routes/conversation-routes.ts index e4d308e015f..2fb05f38d26 100644 --- a/assistant/src/runtime/routes/conversation-routes.ts +++ b/assistant/src/runtime/routes/conversation-routes.ts @@ -48,7 +48,10 @@ import { } from "../../memory/canonical-guardian-store.js"; import { addMessage, + getLastAssistantTimestampBefore, getMessages, + getMessagesPaginated, + type MessageRow, provenanceFromTrustContext, setConversationOriginChannelIfUnset, setConversationOriginInterfaceIfUnset, @@ -360,7 +363,49 @@ export function handleListMessages( if (!resolvedConversationId) { return Response.json({ messages: [] }); } - const rawMessages = getMessages(resolvedConversationId); + + const beforeTimestampRaw = url.searchParams.get("beforeTimestamp"); + const limitRaw = url.searchParams.get("limit"); + + // Validate: reject NaN values with 400 + if (beforeTimestampRaw !== null && isNaN(Number(beforeTimestampRaw))) { + return httpError( + "BAD_REQUEST", + "beforeTimestamp must be a valid number", + 400, + ); + } + if (limitRaw !== null && isNaN(Number(limitRaw))) { + return httpError("BAD_REQUEST", "limit must be a valid number", 400); + } + + const beforeTimestamp = beforeTimestampRaw + ? Number(beforeTimestampRaw) + : undefined; + // Clamp limit to 1-500 range + const limit = limitRaw + ? Math.min(Math.max(Math.floor(Number(limitRaw)), 1), 500) + : undefined; + + // Option A: only paginate when beforeTimestamp is present. + // Initial load and reconnect send limit but no beforeTimestamp — those must continue + // returning all messages for zero regression risk. + const isPaginated = beforeTimestamp != null; + + let rawMessages: MessageRow[]; + let hasMore = false; + + if (isPaginated) { + const result = getMessagesPaginated( + resolvedConversationId, + limit, + beforeTimestamp, + ); + rawMessages = result.messages; + hasMore = result.hasMore; + } else { + rawMessages = getMessages(resolvedConversationId); + } // Parse content blocks and extract text + tool calls const parsed = rawMessages.map((msg) => { @@ -429,6 +474,12 @@ export function handleListMessages( const interfaceFiles = getInterfaceFilesWithMtimes(interfacesDir); let prevAssistantTimestamp = 0; + if (isPaginated && rawMessages.length > 0) { + prevAssistantTimestamp = getLastAssistantTimestampBefore( + resolvedConversationId!, + rawMessages[0].createdAt, + ); + } const messages: RuntimeMessagePayload[] = parsed.map((m) => { let msgAttachments: RuntimeAttachmentMetadata[] = []; if (m.id) { @@ -498,6 +549,19 @@ export function handleListMessages( }; }); + if (isPaginated) { + const oldestTimestamp = + rawMessages.length > 0 ? rawMessages[0].createdAt : undefined; + const oldestMessageId = + rawMessages.length > 0 ? rawMessages[0].id : undefined; + return Response.json({ + messages, + hasMore, + ...(oldestTimestamp != null ? { oldestTimestamp } : {}), + ...(oldestMessageId != null ? { oldestMessageId } : {}), + }); + } + return Response.json({ messages }); } @@ -1502,9 +1566,20 @@ export function conversationRouteDefinitions(deps: { tags: ["messages"], responseBody: z.object({ messages: z.array(z.unknown()).describe("Array of message objects"), - interfaceFiles: z - .array(z.unknown()) - .describe("Interface file paths with modification timestamps"), + hasMore: z + .boolean() + .optional() + .describe("Whether older messages exist beyond this page"), + oldestTimestamp: z + .number() + .optional() + .describe( + "Timestamp of the oldest message in this page (ms since epoch)", + ), + oldestMessageId: z + .string() + .optional() + .describe("ID of the oldest message in this page"), }), handler: ({ url }) => handleListMessages(url, deps.interfacesDir), }, diff --git a/clients/shared/Features/Chat/ChatViewModel.swift b/clients/shared/Features/Chat/ChatViewModel.swift index 92491695741..5b12eaf802e 100644 --- a/clients/shared/Features/Chat/ChatViewModel.swift +++ b/clients/shared/Features/Chat/ChatViewModel.swift @@ -777,9 +777,12 @@ public final class ChatViewModel: ObservableObject { /// True while a previous-page load is in progress (brief async delay for UX). @Published public var isLoadingMoreMessages: Bool = false - /// Timeout task that logs a warning if the daemon takes too long to respond - /// to a pagination request. The flag is intentionally NOT cleared here — - /// see the comment in `loadPreviousMessagePage()` for rationale. + /// Timeout task that logs a warning at 30s if the daemon is slow, then + /// clears `isLoadingMoreMessages` at 60s to unblock the user. The 30s + /// warning preserves the flag to avoid misclassifying late-but-valid + /// responses (see loadPreviousMessagePage); the 60s hard clear accepts + /// the risk of a narrow misclassification window to prevent a permanently + /// stuck loading spinner. private var loadMoreTimeoutTask: Task? /// The subset of messages that are actually displayed (excludes subagent notifications @@ -891,11 +894,19 @@ public final class ChatViewModel: ObservableObject { // of prepending. The flag is properly cleared by populateFromHistory // when the response arrives, or by reconnect/conversation-switch logic if // the daemon disconnects. + // At 60s a hard clear of isLoadingMoreMessages fires to prevent a permanent + // stuck spinner. This accepts a narrow misclassification window for late + // responses arriving between 60-65s. loadMoreTimeoutTask?.cancel() loadMoreTimeoutTask = Task { @MainActor [weak self] in try? await Task.sleep(nanoseconds: 30_000_000_000) // 30 seconds guard let self, !Task.isCancelled, self.isLoadingMoreMessages else { return } log.warning("Pagination request still pending after 30s — daemon may be unresponsive") + try? await Task.sleep(nanoseconds: 30_000_000_000) // +30s = 60s total + guard !Task.isCancelled, self.isLoadingMoreMessages else { return } + log.error("Pagination request timed out after 60s — resetting pagination state") + self.isLoadingMoreMessages = false + self.loadMoreTimeoutTask = nil } onLoadMoreHistory?(conversationId, cursor) // The loading indicator is cleared by populateFromHistory when the response arrives. @@ -2889,7 +2900,7 @@ public final class ChatViewModel: ObservableObject { self.loadMoreTimeoutTask?.cancel() self.loadMoreTimeoutTask = nil self.isLoadingMoreMessages = false - trimOldMessagesIfNeeded() + // TODO: Add pagination-aware trim that doesn't regress historyCursor (follow-up) refreshModelMetadataIfNeeded(hasModelCommand) os_signpost(.end, log: Self.poiLog, name: "populateFromHistory", signpostID: spid, "path=pagination") return