From f1145e64e7d9f6d38d0ea40bb120c3dbbfd1071b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 31 May 2026 19:41:52 +0000 Subject: [PATCH] fix(web): fold per-LLM-call tool calls and deltas into one assistant bubble The daemon reserves a fresh messageId per LLM call within an agent turn, and the backend merge collapses the run onto the first row's id with the rest as mergedMessageIds aliases. The streaming updaters looked up rows by primary id only, so a later LLM call's events opened a duplicate streaming bubble for an id the run already owned. Resolve target rows by id OR merged alias, and when no row owns the id, fold into the assistant tail (recording the id as an alias) instead of opening a new bubble. A new turn always begins with a user row, so a non-assistant tail still opens a fresh bubble. Applies to text deltas, tool calls, and ui surfaces. Co-Authored-By: vargas@vellum.ai --- .../hooks/stream-message-updaters.test.ts | 167 ++++++++++++++++-- .../chat/hooks/stream-message-updaters.ts | 118 +++++++++---- 2 files changed, 243 insertions(+), 42 deletions(-) diff --git a/apps/web/src/domains/chat/hooks/stream-message-updaters.test.ts b/apps/web/src/domains/chat/hooks/stream-message-updaters.test.ts index 5cd52f9ff02..b5ce8a42e30 100644 --- a/apps/web/src/domains/chat/hooks/stream-message-updaters.test.ts +++ b/apps/web/src/domains/chat/hooks/stream-message-updaters.test.ts @@ -156,11 +156,14 @@ describe("appendTextDelta", () => { expect(result[1]!.textSegments).toEqual([{ type: "text", content: "Hello" }]); }); - it("opens a new bubble when messageId is provided but no row matches", () => { - // Boundary between LLM calls in the same turn: call 1 was finalized - // by `message_complete` (`isStreaming: false`), call 2's first delta - // arrives with a new messageId. The fresh delta MUST open a new - // bubble, not extend the finalized one. + it("folds a later LLM call's delta into the assistant tail, recording its id as an alias", () => { + /** + * A multi-LLM-call agent turn renders as one bubble: the second LLM + * call's first delta carries a fresh messageId, but it must fold into + * the current assistant run rather than open a duplicate bubble. + */ + + // GIVEN the first LLM call has finalized into an assistant tail const call1Final = makeAssistantMsg({ id: "row-A", content: "Hello", @@ -168,15 +171,64 @@ describe("appendTextDelta", () => { contentOrder: [{ type: "text", id: "0" }], isStreaming: false, }); + + // WHEN the next LLM call's first delta arrives with a new messageId const result = appendTextDelta([userMsg, call1Final], " world", "row-B"); - expect(result).toHaveLength(3); - expect(result[2]!.id).toBe("row-B"); - expect(result[2]!.content).toBe(" world"); - expect(result[2]!.isStreaming).toBe(true); - // Original row untouched. + // THEN it extends the existing bubble instead of opening a new one + expect(result).toHaveLength(2); + const tail = result[1]!; + expect(tail.id).toBe("row-A"); + expect(tail.content).toBe("Hello world"); + expect(tail.isStreaming).toBe(true); + + // AND the new messageId is recorded as an alias so later events for it + // (and the post-turn reconcile) resolve to this anchor + expect(tail.mergedMessageIds).toEqual(["row-B"]); + }); + + it("opens a new bubble when messageId is provided and the tail is a user row", () => { + /** + * A new agent turn always begins with a user row, so a delta whose id + * no row owns opens a fresh bubble when the tail is not assistant. + */ + + // GIVEN the tail is a user message (start of a new turn) + // WHEN the first delta of the assistant reply arrives + const result = appendTextDelta([userMsg], "text", "row-B"); + + // THEN a fresh streaming bubble opens, keyed by the messageId + expect(result).toHaveLength(2); + expect(result[1]!.id).toBe("row-B"); + expect(result[1]!.content).toBe("text"); + expect(result[1]!.isStreaming).toBe(true); + expect(result[1]!.mergedMessageIds).toBeUndefined(); + }); + + it("folds a delta whose id the anchor already lists as a merged alias", () => { + /** + * The backend merge collapses a run of reserved rows onto the first + * row's id and lists the rest as aliases. A live delta carrying one of + * those alias ids must resolve to the anchor, not open a duplicate. + */ + + // GIVEN an anchor row that already owns "row-B" as a merged alias + const anchor = makeAssistantMsg({ + id: "row-A", + content: "Hello", + textSegments: [{ type: "text", content: "Hello" }], + contentOrder: [{ type: "text", id: "0" }], + mergedMessageIds: ["row-B"], + }); + + // WHEN a delta arrives stamped with the aliased id + const result = appendTextDelta([userMsg, anchor], " world", "row-B"); + + // THEN it extends the anchor and leaves the alias set unchanged + expect(result).toHaveLength(2); expect(result[1]!.id).toBe("row-A"); - expect(result[1]!.content).toBe("Hello"); + expect(result[1]!.content).toBe("Hello world"); + expect(result[1]!.mergedMessageIds).toEqual(["row-B"]); }); it("extends consecutive same-id deltas into a single row", () => { @@ -503,6 +555,56 @@ describe("upsertToolCall", () => { expect(result[1]!.toolCalls![0]!.id).toBe("tc-1"); }); + it("folds a later LLM call's tool call into the assistant tail, recording its id as an alias", () => { + /** + * A multi-LLM-call turn renders as one bubble: a tool call from a + * later call (fresh messageId) folds into the current assistant run + * instead of opening a duplicate bubble. + */ + + // GIVEN the first LLM call has finalized into an assistant tail + const call1Final = makeAssistantMsg({ + id: "row-A", + content: "Hello", + isStreaming: false, + }); + + // WHEN the next LLM call's tool_use_start arrives with a new messageId + const result = upsertToolCall([userMsg, call1Final], toolCall, "row-B"); + + // THEN it folds into the existing bubble and records the alias + expect(result).toHaveLength(2); + const tail = result[1]!; + expect(tail.id).toBe("row-A"); + expect(tail.isStreaming).toBe(true); + expect(tail.toolCalls).toHaveLength(1); + expect(tail.toolCalls![0]!.id).toBe("tc-1"); + expect(tail.mergedMessageIds).toEqual(["row-B"]); + }); + + it("folds a tool call whose id the anchor already lists as a merged alias", () => { + /** + * A tool_use_start carrying an id the backend merge already folded + * onto the anchor must resolve to the anchor, not open a duplicate. + */ + + // GIVEN an anchor row that already owns "row-B" as a merged alias + const anchor = makeAssistantMsg({ + id: "row-A", + content: "Hello", + mergedMessageIds: ["row-B"], + }); + + // WHEN a tool call arrives stamped with the aliased id + const result = upsertToolCall([userMsg, anchor], toolCall, "row-B"); + + // THEN it folds into the anchor and leaves the alias set unchanged + expect(result).toHaveLength(2); + expect(result[1]!.id).toBe("row-A"); + expect(result[1]!.toolCalls![0]!.id).toBe("tc-1"); + expect(result[1]!.mergedMessageIds).toEqual(["row-B"]); + }); + it("adopts messageId as the row id when opening a new bubble (no isOptimistic flag)", () => { // Anchor protocol: every `tool_use_start` carries `messageId` from // event zero — the daemon has committed to the assistant message @@ -547,6 +649,49 @@ describe("attachSurface", () => { expect(result[1]!.surfaces![0]!.surfaceId).toBe("surf-1"); }); + it("attaches to an assistant row that owns the messageId as a merged alias", () => { + /** + * The backend merge lists later LLM-call ids as aliases on the anchor. + * A surface stamped with such an id must resolve to the anchor rather + * than open a duplicate bubble. + */ + + // GIVEN an anchor row that already owns "row-B" as a merged alias + const anchor = makeAssistantMsg({ + id: "row-A", + isStreaming: false, + mergedMessageIds: ["row-B"], + }); + + // WHEN a surface arrives stamped with the aliased id + const result = attachSurface([userMsg, anchor], surface, "row-B"); + + // THEN it attaches to the anchor and leaves the alias set unchanged + expect(result).toHaveLength(2); + expect(result[1]!.id).toBe("row-A"); + expect(result[1]!.surfaces![0]!.surfaceId).toBe("surf-1"); + expect(result[1]!.mergedMessageIds).toEqual(["row-B"]); + }); + + it("folds a later LLM call's surface into the assistant tail, recording its id as an alias", () => { + /** + * A surface from a later LLM call (fresh messageId) folds into the + * current assistant run instead of opening a duplicate bubble. + */ + + // GIVEN the first LLM call has finalized into an assistant tail + const call1Final = makeAssistantMsg({ id: "row-A", isStreaming: false }); + + // WHEN the next LLM call's surface arrives with a new messageId + const result = attachSurface([userMsg, call1Final], surface, "row-B"); + + // THEN it attaches to the existing bubble and records the alias + expect(result).toHaveLength(2); + expect(result[1]!.id).toBe("row-A"); + expect(result[1]!.surfaces![0]!.surfaceId).toBe("surf-1"); + expect(result[1]!.mergedMessageIds).toEqual(["row-B"]); + }); + it("falls back to the streaming-assistant tail when messageId is absent", () => { const target = makeAssistantMsg({ id: "stream-1", isStreaming: true }); const result = attachSurface([userMsg, target], surface); diff --git a/apps/web/src/domains/chat/hooks/stream-message-updaters.ts b/apps/web/src/domains/chat/hooks/stream-message-updaters.ts index 1ad4c85515a..d3fba181103 100644 --- a/apps/web/src/domains/chat/hooks/stream-message-updaters.ts +++ b/apps/web/src/domains/chat/hooks/stream-message-updaters.ts @@ -47,6 +47,49 @@ export function tailIsStreamingAssistant(prev: DisplayMessage[]): boolean { return !!last && last.role === "assistant" && !!last.isStreaming; } +/** Whether the tail row is an assistant message, regardless of streaming. */ +export function tailIsAssistant(prev: DisplayMessage[]): boolean { + const last = prev[prev.length - 1]; + return !!last && last.role === "assistant"; +} + +/** + * Find the assistant row that owns `messageId` — by its primary `id` or by + * a folded `mergedMessageIds` alias. + * + * The daemon reserves a fresh `messageId` per LLM call within a single + * agent turn, and the backend's `mergeConsecutiveAssistantMessages` + * collapses that run onto the first row's id, listing the later ids as + * aliases. Matching on aliases — not just the primary id — lets a later + * LLM call's deltas and tool calls fold into the same anchor instead of + * opening a duplicate streaming bubble for an id the run already owns. + */ +function findAssistantRowIndexByMessageId( + prev: DisplayMessage[], + messageId: string, +): number { + return prev.findIndex( + (m) => + m.role === "assistant" && + (m.id === messageId || !!m.mergedMessageIds?.includes(messageId)), + ); +} + +/** + * Record `messageId` as a `mergedMessageIds` alias on `row` when it isn't + * already the row's primary id or a known alias. Mirrors the backend merge + * so a subsequent reconcile / SSE lookup by that id resolves to this row. + */ +function withMergedAlias( + row: DisplayMessage, + messageId: string | undefined, +): DisplayMessage { + if (!messageId || row.id === messageId) return row; + const existing = row.mergedMessageIds ?? []; + if (existing.includes(messageId)) return row; + return { ...row, mergedMessageIds: [...existing, messageId] }; +} + // --------------------------------------------------------------------------- // assistant_text_delta // --------------------------------------------------------------------------- @@ -94,8 +137,9 @@ function appendTextIntoRow( prev: DisplayMessage[], idx: number, text: string, + messageId?: string, ): DisplayMessage[] { - const row = prev[idx]!; + const row = withMergedAlias(prev[idx]!, messageId); const segments = [...(row.textSegments ?? [])]; const order = [...(row.contentOrder ?? [])]; const lastOrderEntry = order[order.length - 1]; @@ -128,13 +172,20 @@ function appendTextIntoRow( /** * Apply an `assistant_text_delta` to the message array. * - * **Id-keyed when `messageId` is present** (B2/B3 onward — stamped on - * every event from event zero of the turn). Looks up the matching - * assistant row and appends into it regardless of position. Covers the - * case where reconcile (or `assistant_turn_start`) landed the reserved - * row in the array ahead of the first delta — without id matching, - * `tailIsStreamingAssistant(prev)` returns false for that snapshot row - * and a duplicate streaming bubble opens with the same id. + * **Identity-keyed when `messageId` is present** (B2/B3 onward — stamped + * on every event from event zero of the turn). Looks up the assistant row + * that owns the id (primary id or merged alias) and appends into it + * regardless of position. Covers the case where reconcile (or + * `assistant_turn_start`) landed the reserved row in the array ahead of + * the first delta. + * + * When no row owns the id yet, the delta belongs to a later LLM call in + * the current agent turn (each call reserves a fresh messageId). A single + * turn renders as one bubble — the backend collapses the run of reserved + * rows onto the first row's id — so the delta folds into the current + * assistant tail (recording the id as an alias) rather than opening a + * duplicate bubble. Only a non-assistant tail (a new turn always begins + * with a user row) opens a fresh bubble. * * Falls back to tail-based decisioning when `messageId` is absent, for * pre-B2 daemons not pinned by the B4 floor bump. @@ -145,10 +196,11 @@ export function appendTextDelta( messageId?: string, ): DisplayMessage[] { if (messageId) { - const idx = prev.findIndex( - (m) => m.role === "assistant" && m.id === messageId, - ); - if (idx >= 0) return appendTextIntoRow(prev, idx, text); + const idx = findAssistantRowIndexByMessageId(prev, messageId); + if (idx >= 0) return appendTextIntoRow(prev, idx, text, messageId); + if (tailIsAssistant(prev)) { + return appendTextIntoRow(prev, prev.length - 1, text, messageId); + } return createStreamingBubble(prev, text, messageId); } @@ -395,7 +447,10 @@ export function attachSurface( let targetIdx = -1; if (messageId) { - targetIdx = prev.findIndex((m) => m.id === messageId); + // Identity-keyed: resolve the row that owns this id by primary id or + // merged alias — a surface from a later LLM call carries an id the + // anchor may already list as an alias. See `appendTextDelta`. + targetIdx = findAssistantRowIndexByMessageId(prev, messageId); } if (targetIdx === -1) { for (let i = prev.length - 1; i >= 0; i--) { @@ -432,7 +487,7 @@ export function attachSurface( timestamp: Date.now(), }); } else { - const target = prev[targetIdx]!; + const target = withMergedAlias(prev[targetIdx]!, messageId); if ( target.contentOrder?.some( (e) => e.type === "surface" && e.id === surface.surfaceId, @@ -546,20 +601,19 @@ export function completeSurface( // --------------------------------------------------------------------------- /** - * Insert or update a tool call on the streaming assistant tail bubble, - * creating a new bubble if the tail isn't a streaming assistant row. + * Insert or update a tool call on the current assistant bubble, creating a + * new bubble only when the tail isn't an assistant row. * - * The bubble-creation decision is derived from `prev` itself — no shared - * latch passes through. Same finalization invariant as `appendTextDelta`: - * boundary events leave the tail with `isStreaming: false` (or non- - * assistant), so this updater opens a fresh bubble correctly. + * **Identity-keyed when `messageId` is present** — looks up the assistant + * row that owns the id (primary id or merged alias) and folds into it + * regardless of position. Mirrors `appendTextDelta`: when no row owns the + * id yet, the tool call belongs to a later LLM call in the current agent + * turn, so it folds into the assistant tail (recording the id as an alias) + * to keep the turn one bubble rather than splitting per call. Only a + * non-assistant tail opens a fresh bubble. * - * **Id-keyed when `messageId` is present** — looks up the matching - * assistant row by id and folds into it regardless of position. Mirrors - * `appendTextDelta`'s behavior for the case where reconcile (or - * `assistant_turn_start`) landed the reserved row in the array ahead of - * the first `tool_use_start` — without id matching, a duplicate streaming - * bubble would open with the same anchor id. + * Falls back to tail-based decisioning when `messageId` is absent, for + * pre-anchor-protocol daemons. */ export function upsertToolCall( prev: DisplayMessage[], @@ -567,10 +621,11 @@ export function upsertToolCall( messageId?: string, ): DisplayMessage[] { if (messageId) { - const idx = prev.findIndex( - (m) => m.role === "assistant" && m.id === messageId, - ); - if (idx >= 0) return upsertToolCallIntoRow(prev, idx, toolCall); + const idx = findAssistantRowIndexByMessageId(prev, messageId); + if (idx >= 0) return upsertToolCallIntoRow(prev, idx, toolCall, messageId); + if (tailIsAssistant(prev)) { + return upsertToolCallIntoRow(prev, prev.length - 1, toolCall, messageId); + } } else if (tailIsStreamingAssistant(prev)) { return upsertToolCallIntoRow(prev, prev.length - 1, toolCall); } @@ -606,8 +661,9 @@ function upsertToolCallIntoRow( prev: DisplayMessage[], idx: number, toolCall: ChatMessageToolCall, + messageId?: string, ): DisplayMessage[] { - const row = prev[idx]!; + const row = withMergedAlias(prev[idx]!, messageId); const existingIdx = row.toolCalls?.findIndex((tc) => tc.id === toolCall.id) ?? -1; const updated = [...prev];