Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 156 additions & 11 deletions apps/web/src/domains/chat/hooks/stream-message-updaters.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,27 +156,79 @@ describe("appendTextDelta", () => {
expect(result[1]!.textSegments).toEqual([{ type: "text", content: "Hello" }]);
});

it("opens a new bubble when messageId is provided but no row matches", () => {
// Boundary between LLM calls in the same turn: call 1 was finalized
// by `message_complete` (`isStreaming: false`), call 2's first delta
// arrives with a new messageId. The fresh delta MUST open a new
// bubble, not extend the finalized one.
it("folds a later LLM call's delta into the assistant tail, recording its id as an alias", () => {
/**
* A multi-LLM-call agent turn renders as one bubble: the second LLM
* call's first delta carries a fresh messageId, but it must fold into
* the current assistant run rather than open a duplicate bubble.
*/

// GIVEN the first LLM call has finalized into an assistant tail
const call1Final = makeAssistantMsg({
id: "row-A",
content: "Hello",
textSegments: [{ type: "text", content: "Hello" }],
contentOrder: [{ type: "text", id: "0" }],
isStreaming: false,
});

// WHEN the next LLM call's first delta arrives with a new messageId
const result = appendTextDelta([userMsg, call1Final], " world", "row-B");

expect(result).toHaveLength(3);
expect(result[2]!.id).toBe("row-B");
expect(result[2]!.content).toBe(" world");
expect(result[2]!.isStreaming).toBe(true);
// Original row untouched.
// THEN it extends the existing bubble instead of opening a new one
expect(result).toHaveLength(2);
const tail = result[1]!;
expect(tail.id).toBe("row-A");
expect(tail.content).toBe("Hello world");
expect(tail.isStreaming).toBe(true);

// AND the new messageId is recorded as an alias so later events for it
// (and the post-turn reconcile) resolve to this anchor
expect(tail.mergedMessageIds).toEqual(["row-B"]);
});

it("opens a new bubble when messageId is provided and the tail is a user row", () => {
/**
* A new agent turn always begins with a user row, so a delta whose id
* no row owns opens a fresh bubble when the tail is not assistant.
*/

// GIVEN the tail is a user message (start of a new turn)
// WHEN the first delta of the assistant reply arrives
const result = appendTextDelta([userMsg], "text", "row-B");

// THEN a fresh streaming bubble opens, keyed by the messageId
expect(result).toHaveLength(2);
expect(result[1]!.id).toBe("row-B");
expect(result[1]!.content).toBe("text");
expect(result[1]!.isStreaming).toBe(true);
expect(result[1]!.mergedMessageIds).toBeUndefined();
});

it("folds a delta whose id the anchor already lists as a merged alias", () => {
/**
* The backend merge collapses a run of reserved rows onto the first
* row's id and lists the rest as aliases. A live delta carrying one of
* those alias ids must resolve to the anchor, not open a duplicate.
*/

// GIVEN an anchor row that already owns "row-B" as a merged alias
const anchor = makeAssistantMsg({
id: "row-A",
content: "Hello",
textSegments: [{ type: "text", content: "Hello" }],
contentOrder: [{ type: "text", id: "0" }],
mergedMessageIds: ["row-B"],
});

// WHEN a delta arrives stamped with the aliased id
const result = appendTextDelta([userMsg, anchor], " world", "row-B");

// THEN it extends the anchor and leaves the alias set unchanged
expect(result).toHaveLength(2);
expect(result[1]!.id).toBe("row-A");
expect(result[1]!.content).toBe("Hello");
expect(result[1]!.content).toBe("Hello world");
expect(result[1]!.mergedMessageIds).toEqual(["row-B"]);
});

it("extends consecutive same-id deltas into a single row", () => {
Expand Down Expand Up @@ -503,6 +555,56 @@ describe("upsertToolCall", () => {
expect(result[1]!.toolCalls![0]!.id).toBe("tc-1");
});

it("folds a later LLM call's tool call into the assistant tail, recording its id as an alias", () => {
/**
* A multi-LLM-call turn renders as one bubble: a tool call from a
* later call (fresh messageId) folds into the current assistant run
* instead of opening a duplicate bubble.
*/

// GIVEN the first LLM call has finalized into an assistant tail
const call1Final = makeAssistantMsg({
id: "row-A",
content: "Hello",
isStreaming: false,
});

// WHEN the next LLM call's tool_use_start arrives with a new messageId
const result = upsertToolCall([userMsg, call1Final], toolCall, "row-B");

// THEN it folds into the existing bubble and records the alias
expect(result).toHaveLength(2);
const tail = result[1]!;
expect(tail.id).toBe("row-A");
expect(tail.isStreaming).toBe(true);
expect(tail.toolCalls).toHaveLength(1);
expect(tail.toolCalls![0]!.id).toBe("tc-1");
expect(tail.mergedMessageIds).toEqual(["row-B"]);
});

it("folds a tool call whose id the anchor already lists as a merged alias", () => {
/**
* A tool_use_start carrying an id the backend merge already folded
* onto the anchor must resolve to the anchor, not open a duplicate.
*/

// GIVEN an anchor row that already owns "row-B" as a merged alias
const anchor = makeAssistantMsg({
id: "row-A",
content: "Hello",
mergedMessageIds: ["row-B"],
});

// WHEN a tool call arrives stamped with the aliased id
const result = upsertToolCall([userMsg, anchor], toolCall, "row-B");

// THEN it folds into the anchor and leaves the alias set unchanged
expect(result).toHaveLength(2);
expect(result[1]!.id).toBe("row-A");
expect(result[1]!.toolCalls![0]!.id).toBe("tc-1");
expect(result[1]!.mergedMessageIds).toEqual(["row-B"]);
});

it("adopts messageId as the row id when opening a new bubble (no isOptimistic flag)", () => {
// Anchor protocol: every `tool_use_start` carries `messageId` from
// event zero — the daemon has committed to the assistant message
Expand Down Expand Up @@ -547,6 +649,49 @@ describe("attachSurface", () => {
expect(result[1]!.surfaces![0]!.surfaceId).toBe("surf-1");
});

it("attaches to an assistant row that owns the messageId as a merged alias", () => {
/**
* The backend merge lists later LLM-call ids as aliases on the anchor.
* A surface stamped with such an id must resolve to the anchor rather
* than open a duplicate bubble.
*/

// GIVEN an anchor row that already owns "row-B" as a merged alias
const anchor = makeAssistantMsg({
id: "row-A",
isStreaming: false,
mergedMessageIds: ["row-B"],
});

// WHEN a surface arrives stamped with the aliased id
const result = attachSurface([userMsg, anchor], surface, "row-B");

// THEN it attaches to the anchor and leaves the alias set unchanged
expect(result).toHaveLength(2);
expect(result[1]!.id).toBe("row-A");
expect(result[1]!.surfaces![0]!.surfaceId).toBe("surf-1");
expect(result[1]!.mergedMessageIds).toEqual(["row-B"]);
});

it("folds a later LLM call's surface into the assistant tail, recording its id as an alias", () => {
/**
* A surface from a later LLM call (fresh messageId) folds into the
* current assistant run instead of opening a duplicate bubble.
*/

// GIVEN the first LLM call has finalized into an assistant tail
const call1Final = makeAssistantMsg({ id: "row-A", isStreaming: false });

// WHEN the next LLM call's surface arrives with a new messageId
const result = attachSurface([userMsg, call1Final], surface, "row-B");

// THEN it attaches to the existing bubble and records the alias
expect(result).toHaveLength(2);
expect(result[1]!.id).toBe("row-A");
expect(result[1]!.surfaces![0]!.surfaceId).toBe("surf-1");
expect(result[1]!.mergedMessageIds).toEqual(["row-B"]);
});

it("falls back to the streaming-assistant tail when messageId is absent", () => {
const target = makeAssistantMsg({ id: "stream-1", isStreaming: true });
const result = attachSurface([userMsg, target], surface);
Expand Down
118 changes: 87 additions & 31 deletions apps/web/src/domains/chat/hooks/stream-message-updaters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,49 @@ export function tailIsStreamingAssistant(prev: DisplayMessage[]): boolean {
return !!last && last.role === "assistant" && !!last.isStreaming;
}

/** Whether the tail row is an assistant message, regardless of streaming. */
export function tailIsAssistant(prev: DisplayMessage[]): boolean {
const last = prev[prev.length - 1];
return !!last && last.role === "assistant";
}

/**
* Find the assistant row that owns `messageId` — by its primary `id` or by
* a folded `mergedMessageIds` alias.
*
* The daemon reserves a fresh `messageId` per LLM call within a single
* agent turn, and the backend's `mergeConsecutiveAssistantMessages`
* collapses that run onto the first row's id, listing the later ids as
* aliases. Matching on aliases — not just the primary id — lets a later
* LLM call's deltas and tool calls fold into the same anchor instead of
* opening a duplicate streaming bubble for an id the run already owns.
*/
function findAssistantRowIndexByMessageId(
prev: DisplayMessage[],
messageId: string,
): number {
return prev.findIndex(
(m) =>
m.role === "assistant" &&
(m.id === messageId || !!m.mergedMessageIds?.includes(messageId)),
);
}

/**
* Record `messageId` as a `mergedMessageIds` alias on `row` when it isn't
* already the row's primary id or a known alias. Mirrors the backend merge
* so a subsequent reconcile / SSE lookup by that id resolves to this row.
*/
function withMergedAlias(
row: DisplayMessage,
messageId: string | undefined,
): DisplayMessage {
if (!messageId || row.id === messageId) return row;
const existing = row.mergedMessageIds ?? [];
if (existing.includes(messageId)) return row;
return { ...row, mergedMessageIds: [...existing, messageId] };
}

// ---------------------------------------------------------------------------
// assistant_text_delta
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -94,8 +137,9 @@ function appendTextIntoRow(
prev: DisplayMessage[],
idx: number,
text: string,
messageId?: string,
): DisplayMessage[] {
const row = prev[idx]!;
const row = withMergedAlias(prev[idx]!, messageId);
const segments = [...(row.textSegments ?? [])];
const order = [...(row.contentOrder ?? [])];
const lastOrderEntry = order[order.length - 1];
Expand Down Expand Up @@ -128,13 +172,20 @@ function appendTextIntoRow(
/**
* Apply an `assistant_text_delta` to the message array.
*
* **Id-keyed when `messageId` is present** (B2/B3 onward — stamped on
* every event from event zero of the turn). Looks up the matching
* assistant row and appends into it regardless of position. Covers the
* case where reconcile (or `assistant_turn_start`) landed the reserved
* row in the array ahead of the first delta — without id matching,
* `tailIsStreamingAssistant(prev)` returns false for that snapshot row
* and a duplicate streaming bubble opens with the same id.
* **Identity-keyed when `messageId` is present** (B2/B3 onward — stamped
* on every event from event zero of the turn). Looks up the assistant row
* that owns the id (primary id or merged alias) and appends into it
* regardless of position. Covers the case where reconcile (or
* `assistant_turn_start`) landed the reserved row in the array ahead of
* the first delta.
*
* When no row owns the id yet, the delta belongs to a later LLM call in
* the current agent turn (each call reserves a fresh messageId). A single
* turn renders as one bubble — the backend collapses the run of reserved
* rows onto the first row's id — so the delta folds into the current
* assistant tail (recording the id as an alias) rather than opening a
* duplicate bubble. Only a non-assistant tail (a new turn always begins
* with a user row) opens a fresh bubble.
*
* Falls back to tail-based decisioning when `messageId` is absent, for
* pre-B2 daemons not pinned by the B4 floor bump.
Expand All @@ -145,10 +196,11 @@ export function appendTextDelta(
messageId?: string,
): DisplayMessage[] {
if (messageId) {
const idx = prev.findIndex(
(m) => m.role === "assistant" && m.id === messageId,
);
if (idx >= 0) return appendTextIntoRow(prev, idx, text);
const idx = findAssistantRowIndexByMessageId(prev, messageId);
if (idx >= 0) return appendTextIntoRow(prev, idx, text, messageId);
if (tailIsAssistant(prev)) {
return appendTextIntoRow(prev, prev.length - 1, text, messageId);
}
return createStreamingBubble(prev, text, messageId);
Comment on lines +201 to 204

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Fold unmatched deltas into the active assistant row

When the user queues another message while a turn is still streaming, useSendMessage appends that queued user row to the end of messages (apps/web/src/domains/chat/hooks/use-send-message.ts:568). If the current turn then advances to a later LLM call, its fresh messageId is not owned by any row yet, so this branch sees a non-assistant tail and creates a new assistant bubble after the queued user instead of folding into the in-flight assistant message; the same per-LLM-call split this change is trying to avoid still occurs for queued-message conversations. Search backward for the current/latest assistant row before falling back to createStreamingBubble.

Useful? React with 👍 / 👎.

}

Expand Down Expand Up @@ -395,7 +447,10 @@ export function attachSurface(
let targetIdx = -1;

if (messageId) {
targetIdx = prev.findIndex((m) => m.id === messageId);
// Identity-keyed: resolve the row that owns this id by primary id or
// merged alias — a surface from a later LLM call carries an id the
// anchor may already list as an alias. See `appendTextDelta`.
targetIdx = findAssistantRowIndexByMessageId(prev, messageId);
}
if (targetIdx === -1) {
for (let i = prev.length - 1; i >= 0; i--) {
Expand Down Expand Up @@ -432,7 +487,7 @@ export function attachSurface(
timestamp: Date.now(),
});
} else {
const target = prev[targetIdx]!;
const target = withMergedAlias(prev[targetIdx]!, messageId);
if (
target.contentOrder?.some(
(e) => e.type === "surface" && e.id === surface.surfaceId,
Expand Down Expand Up @@ -546,31 +601,31 @@ export function completeSurface(
// ---------------------------------------------------------------------------

/**
* Insert or update a tool call on the streaming assistant tail bubble,
* creating a new bubble if the tail isn't a streaming assistant row.
* Insert or update a tool call on the current assistant bubble, creating a
* new bubble only when the tail isn't an assistant row.
*
* The bubble-creation decision is derived from `prev` itself — no shared
* latch passes through. Same finalization invariant as `appendTextDelta`:
* boundary events leave the tail with `isStreaming: false` (or non-
* assistant), so this updater opens a fresh bubble correctly.
* **Identity-keyed when `messageId` is present** — looks up the assistant
* row that owns the id (primary id or merged alias) and folds into it
* regardless of position. Mirrors `appendTextDelta`: when no row owns the
* id yet, the tool call belongs to a later LLM call in the current agent
* turn, so it folds into the assistant tail (recording the id as an alias)
* to keep the turn one bubble rather than splitting per call. Only a
* non-assistant tail opens a fresh bubble.
*
* **Id-keyed when `messageId` is present** — looks up the matching
* assistant row by id and folds into it regardless of position. Mirrors
* `appendTextDelta`'s behavior for the case where reconcile (or
* `assistant_turn_start`) landed the reserved row in the array ahead of
* the first `tool_use_start` — without id matching, a duplicate streaming
* bubble would open with the same anchor id.
* Falls back to tail-based decisioning when `messageId` is absent, for
* pre-anchor-protocol daemons.
*/
export function upsertToolCall(
prev: DisplayMessage[],
toolCall: ChatMessageToolCall,
messageId?: string,
): DisplayMessage[] {
if (messageId) {
const idx = prev.findIndex(
(m) => m.role === "assistant" && m.id === messageId,
);
if (idx >= 0) return upsertToolCallIntoRow(prev, idx, toolCall);
const idx = findAssistantRowIndexByMessageId(prev, messageId);
if (idx >= 0) return upsertToolCallIntoRow(prev, idx, toolCall, messageId);
if (tailIsAssistant(prev)) {
return upsertToolCallIntoRow(prev, prev.length - 1, toolCall, messageId);
}
} else if (tailIsStreamingAssistant(prev)) {
return upsertToolCallIntoRow(prev, prev.length - 1, toolCall);
}
Expand Down Expand Up @@ -606,8 +661,9 @@ function upsertToolCallIntoRow(
prev: DisplayMessage[],
idx: number,
toolCall: ChatMessageToolCall,
messageId?: string,
): DisplayMessage[] {
const row = prev[idx]!;
const row = withMergedAlias(prev[idx]!, messageId);
const existingIdx =
row.toolCalls?.findIndex((tc) => tc.id === toolCall.id) ?? -1;
const updated = [...prev];
Expand Down