Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions assistant/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4210,13 +4210,17 @@ paths:
type: array
items: {}
description: Array of message objects
interfaceFiles:
type: array
items: {}
description: Interface file paths with modification timestamps
hasMore:
description: Whether older messages exist beyond this page
type: boolean
oldestTimestamp:
description: Timestamp of the oldest message in this page (ms since epoch)
type: number
oldestMessageId:
description: ID of the oldest message in this page
type: string
required:
- messages
- interfaceFiles
additionalProperties: false
post:
operationId: messages_post
Expand Down
171 changes: 171 additions & 0 deletions assistant/src/__tests__/list-messages-attachments.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,174 @@ describe("handleListMessages no_response filtering", () => {
expect(body.messages[0].content).toBe("What does <no_response/> do?");
});
});

// ---------------------------------------------------------------------------
// Pagination
// ---------------------------------------------------------------------------

interface PaginatedResponse {
messages: { id: string; content: string; timestamp: string }[];
hasMore?: boolean;
oldestTimestamp?: number;
oldestMessageId?: string;
}

function createPaginatedUrl(
conversationId: string,
params?: { limit?: string; beforeTimestamp?: string },
): URL {
const url = new URL(
`http://localhost/v1/messages?conversationId=${conversationId}`,
);
if (params?.limit !== undefined) url.searchParams.set("limit", params.limit);
if (params?.beforeTimestamp !== undefined)
url.searchParams.set("beforeTimestamp", params.beforeTimestamp);
return url;
}

/** Helper: insert N messages with distinct, increasing timestamps and return them in insertion order. */
async function insertMessages(
conversationId: string,
count: number,
): Promise<{ id: string; createdAt: number }[]> {
const msgs: { id: string; createdAt: number }[] = [];
for (let i = 0; i < count; i++) {
const msg = await addMessage(
conversationId,
i % 2 === 0 ? "user" : "assistant",
JSON.stringify([{ type: "text", text: `msg-${i}` }]),
);
msgs.push({ id: msg.id, createdAt: msg.createdAt });
}
return msgs;
}

describe("handleListMessages pagination", () => {
beforeEach(resetTables);

test("no params → all messages, no hasMore field", async () => {
const conv = createConversation();
await insertMessages(conv.id, 5);

const response = handleListMessages(createTestUrl(conv.id), null);
const body = (await response.json()) as PaginatedResponse;

expect(body.messages).toHaveLength(5);
expect(body.hasMore).toBeUndefined();
expect(body.oldestTimestamp).toBeUndefined();
expect(body.oldestMessageId).toBeUndefined();
});

test("limit only (no beforeTimestamp) → all messages, no hasMore", async () => {
const conv = createConversation();
await insertMessages(conv.id, 5);

const url = createPaginatedUrl(conv.id, { limit: "3" });
const response = handleListMessages(url, null);
const body = (await response.json()) as PaginatedResponse;

// Option A: without beforeTimestamp, all messages are returned regardless of limit
expect(body.messages).toHaveLength(5);
expect(body.hasMore).toBeUndefined();
});

test("beforeTimestamp + limit → correct page with hasMore: true", async () => {
const conv = createConversation();
const msgs = await insertMessages(conv.id, 10);

// Cursor is message[7]'s timestamp; limit=3 → should return messages [4,5,6]
const url = createPaginatedUrl(conv.id, {
beforeTimestamp: String(msgs[7].createdAt),
limit: "3",
});
const response = handleListMessages(url, null);
const body = (await response.json()) as PaginatedResponse;

expect(body.messages).toHaveLength(3);
expect(body.messages.map((m) => m.id)).toEqual([
msgs[4].id,
msgs[5].id,
msgs[6].id,
]);
expect(body.hasMore).toBe(true);
});

test("beforeTimestamp is strictly exclusive", async () => {
const conv = createConversation();
const msgs = await insertMessages(conv.id, 3);

// Use message[1]'s exact timestamp as cursor — message[1] should NOT appear
const url = createPaginatedUrl(conv.id, {
beforeTimestamp: String(msgs[1].createdAt),
limit: "10",
});
const response = handleListMessages(url, null);
const body = (await response.json()) as PaginatedResponse;

const ids = body.messages.map((m) => m.id);
expect(ids).toContain(msgs[0].id);
expect(ids).not.toContain(msgs[1].id);
expect(ids).not.toContain(msgs[2].id);
});

test("hasMore: false when all older messages fit", async () => {
const conv = createConversation();
const msgs = await insertMessages(conv.id, 5);

// beforeTimestamp beyond the last message, limit larger than total count
const url = createPaginatedUrl(conv.id, {
beforeTimestamp: String(msgs[4].createdAt + 1),
limit: "10",
});
const response = handleListMessages(url, null);
const body = (await response.json()) as PaginatedResponse;

expect(body.messages).toHaveLength(5);
expect(body.hasMore).toBe(false);
});

test("oldestTimestamp and oldestMessageId match oldest returned message", async () => {
const conv = createConversation();
const msgs = await insertMessages(conv.id, 5);

// Fetch last 3 messages before a cursor past the end
const url = createPaginatedUrl(conv.id, {
beforeTimestamp: String(msgs[4].createdAt + 1),
limit: "3",
});
const response = handleListMessages(url, null);
const body = (await response.json()) as PaginatedResponse;

expect(body.messages).toHaveLength(3);
// Oldest returned message is msgs[2] (messages [2,3,4])
expect(body.oldestTimestamp).toBe(msgs[2].createdAt);
expect(body.oldestMessageId).toBe(msgs[2].id);
});

test("empty / nonexistent conversation → empty messages, no pagination metadata", async () => {
const url = createPaginatedUrl("nonexistent-conv-id");
const response = handleListMessages(url, null);
const body = (await response.json()) as PaginatedResponse;

expect(body.messages).toEqual([]);
expect(body.hasMore).toBeUndefined();
expect(body.oldestTimestamp).toBeUndefined();
expect(body.oldestMessageId).toBeUndefined();
});

test("invalid limit (NaN) → 400", async () => {
const conv = createConversation();
const url = createPaginatedUrl(conv.id, { limit: "abc" });
const response = handleListMessages(url, null);

expect(response.status).toBe(400);
});

test("invalid beforeTimestamp (NaN) → 400", async () => {
const conv = createConversation();
const url = createPaginatedUrl(conv.id, { beforeTimestamp: "abc" });
const response = handleListMessages(url, null);

expect(response.status).toBe(400);
});
});
72 changes: 72 additions & 0 deletions assistant/src/memory/conversation-crud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
gte,
inArray,
isNull,
lt,
lte,
sql,
} from "drizzle-orm";
Expand Down Expand Up @@ -1112,6 +1113,77 @@ export function getMessages(conversationId: string): MessageRow[] {
.map(parseMessage);
}

export interface PaginatedMessagesResult {
messages: MessageRow[];
hasMore: boolean;
}

export function getMessagesPaginated(
conversationId: string,
limit: number | undefined,
beforeTimestamp?: number,
): PaginatedMessagesResult {
const db = getDb();

if (limit === undefined) {
const conditions = [eq(messages.conversationId, conversationId)];
if (beforeTimestamp !== undefined) {
conditions.push(lt(messages.createdAt, beforeTimestamp));
}
const rows = db
.select()
.from(messages)
.where(and(...conditions))
.orderBy(asc(messages.createdAt))
.all()
.map(parseMessage);
return { messages: rows, hasMore: false };
}

const conditions = [eq(messages.conversationId, conversationId)];
if (beforeTimestamp !== undefined) {
conditions.push(lt(messages.createdAt, beforeTimestamp));
}
Comment on lines +1144 to +1146
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Include message-id tie-breaker in pagination filter

The pagination query filters only on created_at < beforeTimestamp, so if multiple messages share the same millisecond as the page boundary, the next page permanently skips the remaining same-timestamp messages. This is a real data-loss-in-pagination case for legacy/imported conversations where duplicate millisecond timestamps exist, and the commit already exposes oldestMessageId but never uses it to disambiguate cursors. Please paginate with a stable (created_at, id) cursor (and request-side beforeMessageId) to avoid dropped history rows.

Useful? React with 👍 / 👎.


const rows = db
.select()
.from(messages)
.where(and(...conditions))
.orderBy(desc(messages.createdAt))
.limit(limit + 1)
.all()
.map(parseMessage);

const hasMore = rows.length > limit;
if (hasMore) {
rows.splice(limit);
}
rows.reverse();

return { messages: rows, hasMore };
}

export function getLastAssistantTimestampBefore(
conversationId: string,
beforeTimestamp: number,
): number {
const db = getDb();
const row = db
.select({ createdAt: messages.createdAt })
.from(messages)
.where(
and(
eq(messages.conversationId, conversationId),
eq(messages.role, "assistant"),
lt(messages.createdAt, beforeTimestamp),
),
)
.orderBy(desc(messages.createdAt))
.limit(1)
.get();
return row?.createdAt ?? 0;
}

/** Fetch a single message by ID, optionally scoped to a specific conversation. */
export function getMessageById(
messageId: string,
Expand Down
4 changes: 4 additions & 0 deletions assistant/src/memory/db-init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ import {
migrateLlmRequestLogMessageId,
migrateLlmRequestLogProvider,
migrateMemoryItemSupersession,
migrateMessagesConversationCreatedAtIndex,
migrateMessagesFtsBackfill,
migrateNormalizePhoneIdentities,
migrateNotificationDeliveryThreadDecision,
Expand Down Expand Up @@ -520,6 +521,9 @@ export function initializeDb(): void {
// 93. Strip `integration:` prefix from provider_key across OAuth tables
migrateStripIntegrationPrefixFromProviderKeys(database);

// 94. Composite index on messages(conversation_id, created_at) for paginated history queries
migrateMessagesConversationCreatedAtIndex(database);

validateMigrationState(database);

if (process.env.BUN_TEST === "1") {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import type { DrizzleDb } from "../db-connection.js";

export function migrateMessagesConversationCreatedAtIndex(
database: DrizzleDb,
): void {
database.run(
/*sql*/ `CREATE INDEX IF NOT EXISTS idx_messages_conversation_created_at ON messages(conversation_id, created_at)`,
);
}
1 change: 1 addition & 0 deletions assistant/src/memory/migrations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ export { migrateContactsUserFileColumn } from "./192-contacts-user-file-column.j
export { migrateAddSourceTypeColumns } from "./193-add-source-type-columns.js";
export { migrateCreateMemoryRecallLogs } from "./194-memory-recall-logs.js";
export { migrateOAuthProvidersPingConfig } from "./195-oauth-providers-ping-config.js";
export { migrateMessagesConversationCreatedAtIndex } from "./196-messages-conversation-created-at-index.js";
export { migrateStripIntegrationPrefixFromProviderKeys } from "./196-strip-integration-prefix-from-provider-keys.js";
export {
MIGRATION_REGISTRY,
Expand Down
Loading
Loading