Skip to content

Commit 560853b

Browse files
authored
feat: cursor-based pagination for messages API (#92)
## Summary Implements cursor-based pagination for the `/messages` endpoint to enable infinite scroll in the UI. ### Breaking Change `GET /messages` now returns `PaginatedMessagesResponse` instead of raw array: ```json { "data": [...], "next_cursor": "...", "has_more": true } ``` ### Backend Changes - Add `cursor` and `direction` query params to `/messages` - Return `{data, next_cursor, has_more}` response format - Add `pagination.py` with cursor encode/decode utilities - Add `find_by_field_with_cursor()` to MongoDB adapter ### Frontend Changes - Replace `useTaskMessages` with `useInfiniteTaskMessages` hook - Implement infinite scroll with IntersectionObserver - Add scroll position preservation when loading older messages - Fix Thinking indicator timing - Remove deprecated `custom-subscribe-task-state.tsx` ### Related PRs - TypeScript SDK: TBD - Python SDK: scaleapi/scale-agentex-python#222 ### Suggested Deployment Order 1. Deploy this PR (backend) 2. Publish TypeScript SDK 3. Publish Python SDK 4. Deploy frontend (after updating SDK version)
1 parent 9ec90fe commit 560853b

File tree

7 files changed

+353
-34
lines changed

7 files changed

+353
-34
lines changed

agentex-ui/.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,7 @@ yarn-error.log*
4242
# typescript
4343
*.tsbuildinfo
4444
next-env.d.ts
45+
46+
# pnpm
47+
pnpm-lock.yaml
48+
pnpm-workspace.yaml

agentex/src/adapters/crud_store/adapter_mongodb.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,102 @@ async def find_by_field(
605605
message=f"Failed to find items by field in MongoDB: {e}", detail=str(e)
606606
) from e
607607

608+
async def find_by_field_with_cursor(
609+
self,
610+
field_name: str,
611+
field_value: Any,
612+
limit: int | None = None,
613+
sort_by: dict[str, int] | None = None,
614+
before_id: str | None = None,
615+
after_id: str | None = None,
616+
) -> builtins.list[T]:
617+
"""
618+
Find documents by a given field with cursor-based pagination.
619+
Maps _id to .id for each returned item.
620+
621+
Args:
622+
field_name: The field name to search by
623+
field_value: The value to search for
624+
limit: Optional limit on the number of documents to return
625+
sort_by: Optional dictionary for sorting, e.g. {"created_at": -1} for descending
626+
before_id: Get documents created before this document ID
627+
after_id: Get documents created after this document ID
628+
629+
Note:
630+
Cursor pagination uses the created_at timestamp of the cursor document
631+
to filter results. This provides stable pagination even when new
632+
documents are added.
633+
"""
634+
try:
635+
# Map 'id' field to '_id' for MongoDB if needed
636+
mongo_field_name = "_id" if field_name == "id" else field_name
637+
mongo_field_value = field_value
638+
639+
# Convert id string to ObjectId if searching by _id
640+
if mongo_field_name == "_id" and isinstance(mongo_field_value, str):
641+
try:
642+
mongo_field_value = ObjectId(mongo_field_value)
643+
except Exception:
644+
pass
645+
646+
# Build base query
647+
query: dict[str, Any] = {mongo_field_name: mongo_field_value}
648+
649+
# If cursor is provided, look up the cursor document's timestamp
650+
# Use compound comparison (created_at, _id) to handle timestamp ties
651+
if before_id or after_id:
652+
cursor_id = before_id or after_id
653+
try:
654+
cursor_object_id = ObjectId(cursor_id)
655+
except Exception:
656+
cursor_object_id = cursor_id
657+
658+
cursor_doc = self.collection.find_one({"_id": cursor_object_id})
659+
if cursor_doc and "created_at" in cursor_doc:
660+
cursor_timestamp = cursor_doc["created_at"]
661+
if before_id:
662+
# Get documents where:
663+
# - created_at < cursor_timestamp, OR
664+
# - created_at == cursor_timestamp AND _id < cursor_id (tie-breaker)
665+
query["$or"] = [
666+
{"created_at": {"$lt": cursor_timestamp}},
667+
{
668+
"created_at": cursor_timestamp,
669+
"_id": {"$lt": cursor_object_id},
670+
},
671+
]
672+
else: # after_id
673+
# Get documents where:
674+
# - created_at > cursor_timestamp, OR
675+
# - created_at == cursor_timestamp AND _id > cursor_id (tie-breaker)
676+
query["$or"] = [
677+
{"created_at": {"$gt": cursor_timestamp}},
678+
{
679+
"created_at": cursor_timestamp,
680+
"_id": {"$gt": cursor_object_id},
681+
},
682+
]
683+
684+
# Create a cursor
685+
db_cursor = self.collection.find(query)
686+
687+
# Apply sorting
688+
sort_by_items = list(sort_by.items()) if sort_by else []
689+
# Use ID for tiebreaking
690+
sort_by_items.append(("_id", 1))
691+
db_cursor = db_cursor.sort(sort_by_items)
692+
693+
# Apply limit if specified
694+
limit = limit or DEFAULT_PAGE_LIMIT
695+
db_cursor = db_cursor.limit(limit)
696+
697+
return [self._deserialize(doc) for doc in db_cursor]
698+
except Exception as e:
699+
raise ServiceError(
700+
message=f"Failed to find items by field with cursor in MongoDB: {e}",
701+
detail=str(e),
702+
) from e
703+
608704
@retry_write_operation()
609705
async def delete_by_field(self, field_name: str, field_value: Any) -> int:
610706
"""

agentex/src/api/routes/messages.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1+
from typing import Literal
2+
13
from fastapi import APIRouter
4+
from pydantic import Field
25

36
from src.api.schemas.authorization_types import (
47
AgentexResourceType,
@@ -14,10 +17,24 @@
1417
from src.domain.entities.task_messages import convert_task_message_content_to_entity
1518
from src.domain.use_cases.messages_use_case import DMessageUseCase
1619
from src.utils.authorization_shortcuts import DAuthorizedBodyId, DAuthorizedQuery
20+
from src.utils.model_utils import BaseModel
21+
from src.utils.pagination import decode_cursor, encode_cursor
1722

1823
router = APIRouter(prefix="/messages", tags=["Messages"])
1924

2025

26+
class PaginatedMessagesResponse(BaseModel):
27+
"""Response with cursor pagination metadata."""
28+
29+
data: list[TaskMessage] = Field(..., description="List of messages")
30+
next_cursor: str | None = Field(
31+
None, description="Cursor for fetching the next page of older messages"
32+
)
33+
has_more: bool = Field(
34+
False, description="Whether there are more messages to fetch"
35+
)
36+
37+
2138
@router.post(
2239
"/batch",
2340
response_model=list[TaskMessage],
@@ -118,6 +135,11 @@ async def list_messages(
118135
order_by: str | None = None,
119136
order_direction: str = "desc",
120137
) -> list[TaskMessage]:
138+
"""
139+
List messages for a task with offset-based pagination.
140+
141+
For cursor-based pagination with infinite scroll support, use /messages/paginated.
142+
"""
121143
task_message_entities = await message_use_case.list_messages(
122144
task_id=task_id,
123145
limit=limit,
@@ -132,6 +154,85 @@ async def list_messages(
132154
]
133155

134156

157+
@router.get(
158+
"/paginated",
159+
response_model=PaginatedMessagesResponse,
160+
)
161+
async def list_messages_paginated(
162+
task_id: DAuthorizedQuery(AgentexResourceType.task, AuthorizedOperationType.read),
163+
message_use_case: DMessageUseCase,
164+
limit: int = 50,
165+
cursor: str | None = None,
166+
direction: Literal["older", "newer"] = "older",
167+
) -> PaginatedMessagesResponse:
168+
"""
169+
List messages for a task with cursor-based pagination.
170+
171+
This endpoint is designed for infinite scroll UIs where new messages may arrive
172+
while paginating through older ones.
173+
174+
Args:
175+
task_id: The task ID to filter messages by
176+
limit: Maximum number of messages to return (default: 50)
177+
cursor: Opaque cursor string for pagination. Pass the `next_cursor` from
178+
a previous response to get the next page.
179+
direction: Pagination direction - "older" to get older messages (default),
180+
"newer" to get newer messages.
181+
182+
Returns:
183+
PaginatedMessagesResponse with:
184+
- data: List of messages (newest first when direction="older")
185+
- next_cursor: Cursor for fetching the next page (null if no more pages)
186+
- has_more: Whether there are more messages to fetch
187+
188+
Example:
189+
First request: GET /messages/paginated?task_id=xxx&limit=50
190+
Next page: GET /messages/paginated?task_id=xxx&limit=50&cursor=<next_cursor>
191+
"""
192+
# Decode cursor if provided
193+
before_id = None
194+
after_id = None
195+
if cursor:
196+
try:
197+
cursor_data = decode_cursor(cursor)
198+
if direction == "older":
199+
before_id = cursor_data.id
200+
else:
201+
after_id = cursor_data.id
202+
except ValueError:
203+
# Invalid cursor, ignore and return from start
204+
pass
205+
206+
# Fetch one extra to determine if there are more results
207+
task_message_entities = await message_use_case.list_messages(
208+
task_id=task_id,
209+
limit=limit + 1,
210+
page_number=1,
211+
order_by=None,
212+
order_direction="desc",
213+
before_id=before_id,
214+
after_id=after_id,
215+
)
216+
217+
# Check if there are more results
218+
has_more = len(task_message_entities) > limit
219+
task_message_entities = task_message_entities[:limit]
220+
221+
# Build next cursor from last message
222+
next_cursor = None
223+
if has_more and task_message_entities:
224+
last_message = task_message_entities[-1]
225+
next_cursor = encode_cursor(last_message.id, last_message.created_at)
226+
227+
messages = [TaskMessage.model_validate(entity) for entity in task_message_entities]
228+
229+
return PaginatedMessagesResponse(
230+
data=messages,
231+
next_cursor=next_cursor,
232+
has_more=has_more,
233+
)
234+
235+
135236
@router.get(
136237
"/{message_id}",
137238
response_model=TaskMessage,

agentex/src/domain/services/task_message_service.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,23 +49,43 @@ async def get_messages(
4949
page_number: int,
5050
order_by: str | None = None,
5151
order_direction: str = "desc",
52+
before_id: str | None = None,
53+
after_id: str | None = None,
5254
) -> list[TaskMessageEntity]:
5355
"""
54-
Get all messages for a specific task.
56+
Get all messages for a specific task with optional cursor-based pagination.
5557
5658
Args:
5759
task_id: The task ID
58-
limit: Optional limit on the number of messages to return
59-
order_by: Optional field name to order by (defaults to created_at)
60-
order_direction: Optional direction to order by ("asc" or "desc", defaults to "desc")
60+
limit: Maximum number of messages to return
61+
page_number: Page number for offset-based pagination
62+
order_by: Field name to order by (defaults to created_at)
63+
order_direction: Direction to order by ("asc" or "desc", defaults to "desc")
64+
before_id: Get messages created before this message ID (cursor pagination)
65+
after_id: Get messages created after this message ID (cursor pagination)
6166
6267
Returns:
6368
List of TaskMessageEntity objects for the task
69+
70+
Note:
71+
When using before_id or after_id, page_number is ignored.
6472
"""
6573
# Default to created_at descending (newest first)
6674
sort_field = order_by or "created_at"
6775
sort_direction = 1 if order_direction.lower() == "asc" else -1
6876

77+
# If cursor pagination is requested, use cursor-based query
78+
if before_id or after_id:
79+
return await self.repository.find_by_field_with_cursor(
80+
field_name="task_id",
81+
field_value=task_id,
82+
limit=limit,
83+
sort_by={sort_field: sort_direction},
84+
before_id=before_id,
85+
after_id=after_id,
86+
)
87+
88+
# Otherwise use standard offset-based pagination
6989
return await self.repository.find_by_field(
7090
"task_id",
7191
task_id,

agentex/src/domain/use_cases/messages_use_case.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,25 +111,35 @@ async def list_messages(
111111
page_number: int,
112112
order_by: str | None = None,
113113
order_direction: str = "desc",
114+
before_id: str | None = None,
115+
after_id: str | None = None,
114116
) -> list[TaskMessageEntity]:
115117
"""
116-
Get all messages for a task.
118+
Get all messages for a task with optional cursor-based pagination.
117119
118120
Args:
119121
task_id: The task ID
120-
limit: Optional limit on the number of messages to return
121-
order_by: Optional field name to order by (defaults to created_at)
122-
order_direction: Optional direction to order by ("asc" or "desc", defaults to "desc")
122+
limit: Maximum number of messages to return
123+
page_number: Page number for offset-based pagination
124+
order_by: Field name to order by (defaults to created_at)
125+
order_direction: Direction to order by ("asc" or "desc", defaults to "desc")
126+
before_id: Get messages created before this message ID (cursor pagination)
127+
after_id: Get messages created after this message ID (cursor pagination)
123128
124129
Returns:
125130
List of TaskMessageEntity objects for the task
131+
132+
Note:
133+
When using before_id or after_id, page_number is ignored.
126134
"""
127135
return await self.task_message_service.get_messages(
128136
task_id=task_id,
129137
limit=limit,
130138
page_number=page_number,
131139
order_by=order_by,
132140
order_direction=order_direction,
141+
before_id=before_id,
142+
after_id=after_id,
133143
)
134144

135145

0 commit comments

Comments
 (0)