Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions apps/web/src/domains/chat/api/event-parser.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,25 @@ describe("parseAssistantEvent", () => {
test("parses message_complete with content", () => {
const event = parseAssistantEvent("message_complete", {
messageId: "msg-1",
displayMessageId: "display-msg-1",
content: "Full response",
});
expect(event).toEqual({
type: "message_complete",
messageId: "msg-1",
displayMessageId: "display-msg-1",
content: "Full response",
attachments: undefined,
});
});

test("ignores legacy displayMessageId on message_complete", () => {
const event = parseAssistantEvent("message_complete", {
messageId: "msg-1",
displayMessageId: "ignored",
content: "Full response",
});
expect(event).toEqual({
type: "message_complete",
messageId: "msg-1",
content: "Full response",
attachments: undefined,
});
Expand Down Expand Up @@ -125,12 +137,22 @@ describe("parseAssistantEvent", () => {
test("parses generation_handoff", () => {
const event = parseAssistantEvent("generation_handoff", {
messageId: "msg-1",
displayMessageId: "display-msg-1",
});
expect(event).toEqual({
type: "generation_handoff",
messageId: "msg-1",
displayMessageId: "display-msg-1",
attachments: undefined,
});
});

test("ignores legacy displayMessageId on generation_handoff", () => {
const event = parseAssistantEvent("generation_handoff", {
messageId: "msg-1",
displayMessageId: "ignored",
});
expect(event).toEqual({
type: "generation_handoff",
messageId: "msg-1",
attachments: undefined,
});
});
Expand Down
6 changes: 0 additions & 6 deletions apps/web/src/domains/chat/api/event-parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,6 @@ export function parseAssistantEvent(
type: "message_complete",
messageId:
typeof data.messageId === "string" ? data.messageId : undefined,
...(typeof data.displayMessageId === "string"
? { displayMessageId: data.displayMessageId }
: {}),
content:
typeof data.content === "string" ? data.content : undefined,
attachments: parseOutboundAttachments(data.attachments),
Expand All @@ -109,9 +106,6 @@ export function parseAssistantEvent(
type: "generation_handoff",
messageId:
typeof data.messageId === "string" ? data.messageId : undefined,
...(typeof data.displayMessageId === "string"
? { displayMessageId: data.displayMessageId }
: {}),
attachments: parseOutboundAttachments(data.attachments),
};

Expand Down
2 changes: 0 additions & 2 deletions apps/web/src/domains/chat/api/event-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ export interface AssistantOutboundAttachment {
export interface MessageCompleteEvent {
type: "message_complete";
messageId?: string;
displayMessageId?: string;
content?: string;
conversationId?: string;
attachments?: AssistantOutboundAttachment[];
Expand All @@ -121,7 +120,6 @@ export interface MessageCompleteEvent {
export interface GenerationHandoffEvent {
type: "generation_handoff";
messageId?: string;
displayMessageId?: string;
conversationId?: string;
attachments?: AssistantOutboundAttachment[];
}
Expand Down
138 changes: 134 additions & 4 deletions apps/web/src/domains/chat/hooks/stream-message-updaters.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
applyToolProgress,
applyToolResult,
createStreamingBubble,
finalizeMessageComplete,
finalizeOnIdle,
handleConversationError,
stopStreaming,
Expand Down Expand Up @@ -123,6 +124,134 @@ describe("appendTextDelta", () => {
appendTextDelta(prev, "b");
expect(prev[0]!.content).toBe("a");
});

it("locks bubble.id to the first id seen — later text_deltas don't overwrite", () => {
// Multi-LLM-call turn: call 1's first text_delta opens the bubble with
// id=A, call 2's text_delta arrives with id=B. The bubble's id must
// stay A (anchor preservation) — the daemon's server-side merge will
// collapse the rows to the first row's id, and the live view must
// match.
const start = createStreamingBubble([userMsg], "Hello", "row-A");
expect(start[1]!.id).toBe("row-A");
const result = appendTextDelta(start, " world", "row-B");
expect(result[1]!.id).toBe("row-A");
expect(result[1]!.content).toBe("Hello world");
});

it("backfills bubble.id when initial bubble had no id", () => {
const start = createStreamingBubble([userMsg], "Hello");
expect(start[1]!.id).toBeUndefined();
const result = appendTextDelta(start, " world", "row-A");
expect(result[1]!.id).toBe("row-A");
});
});

// ---------------------------------------------------------------------------
// finalizeMessageComplete
// ---------------------------------------------------------------------------

describe("finalizeMessageComplete", () => {
it("opens a new finalized assistant bubble when tail is a user message", () => {
const result = finalizeMessageComplete([userMsg], {
type: "message_complete",
conversationId: "c-1",
messageId: "row-A",
content: "done",
});

expect(result).toHaveLength(2);
expect(result[1]!.role).toBe("assistant");
expect(result[1]!.id).toBe("row-A");
expect(result[1]!.content).toBe("done");
expect(result[1]!.isStreaming).toBeUndefined();
});

it("opens a new bubble when prev is empty", () => {
const result = finalizeMessageComplete([], {
type: "message_complete",
conversationId: "c-1",
messageId: "row-A",
content: "first",
});
expect(result).toHaveLength(1);
expect(result[0]!.id).toBe("row-A");
});

it("returns prev unchanged when tail is user and event has no content/attachments", () => {
const prev = [userMsg];
const result = finalizeMessageComplete(prev, {
type: "message_complete",
conversationId: "c-1",
messageId: "row-A",
});
expect(result).toBe(prev);
});

it("finalizes a streaming assistant tail and keeps tail.id (anchor preservation)", () => {
const msg = makeAssistantMsg({ id: "bubble-anchor", content: "hello" });
const result = finalizeMessageComplete([userMsg, msg], {
type: "message_complete",
conversationId: "c-1",
messageId: "inner-row-id",
content: "hello world",
});

expect(result).toHaveLength(2);
expect(result[1]!.id).toBe("bubble-anchor");
expect(result[1]!.isStreaming).toBe(false);
expect(result[1]!.content).toBe("hello world");
});

it("finalizes running tool calls when finalizing", () => {
const toolCall: ChatMessageToolCall = {
id: "t-1",
toolName: "bash",
input: { command: "ls" },
status: "running",
};
const msg = makeAssistantMsg({ id: "bubble-A", toolCalls: [toolCall] });
const result = finalizeMessageComplete([msg], {
type: "message_complete",
conversationId: "c-1",
messageId: "row-B",
});
expect(result[0]!.toolCalls?.[0]!.status).toBe("completed");
});

it("appends to a finalized assistant tail without overwriting its id (multi-LLM-call turn)", () => {
// Second message_complete in the same agent turn — tail is the bubble
// from the previous call (isStreaming already false). Should keep id.
const tail = makeAssistantMsg({
id: "bubble-anchor",
content: "first call done",
isStreaming: false,
});
const result = finalizeMessageComplete([userMsg, tail], {
type: "message_complete",
conversationId: "c-1",
messageId: "row-B",
content: "second call done",
});

expect(result).toHaveLength(2);
expect(result[1]!.id).toBe("bubble-anchor");
expect(result[1]!.content).toBe("second call done");
});

it("ignores legacy displayMessageId on the wire", () => {
// Inbound from an older daemon: a `displayMessageId` field on
// message_complete must be silently ignored — the new contract is
// messageId-only on the wire, anchor preservation is client-side.
const tail = makeAssistantMsg({ id: "bubble-anchor" });
const legacyEvent = {
type: "message_complete" as const,
conversationId: "c-1",
messageId: "row-B",
displayMessageId: "row-A",
} as Parameters<typeof finalizeMessageComplete>[1];
const result = finalizeMessageComplete([tail], legacyEvent);
expect(result[0]!.id).toBe("bubble-anchor");
});
});

// ---------------------------------------------------------------------------
Expand All @@ -146,10 +275,11 @@ describe("stopStreaming", () => {
expect(result).toBe(prev);
});

it("applies optional displayMessageId", () => {
const msg = makeAssistantMsg();
const result = stopStreaming([msg], { displayMessageId: "d-1" });
expect(result[0]!.id).toBe("d-1");
it("keeps tail.id — never stamps a different id onto the bubble", () => {
const msg = makeAssistantMsg({ id: "bubble-anchor" });
const result = stopStreaming([msg]);
expect(result[0]!.id).toBe("bubble-anchor");
expect(result[0]!.isStreaming).toBe(false);
});
});

Expand Down
90 changes: 44 additions & 46 deletions apps/web/src/domains/chat/hooks/stream-message-updaters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
* @see https://react.dev/reference/react/useState#updating-state-based-on-the-previous-state
*/

import type { DisplayAttachment, DisplayMessage } from "@/domains/chat/utils/reconcile.js";
import type { DisplayMessage } from "@/domains/chat/utils/reconcile.js";
import type { Surface } from "@/domains/chat/types/types.js";
import { newStableId } from "@/domains/chat/utils/stable-id.js";
import type { AllowlistOption, ChatMessageToolCall, DirectoryScopeOption, ScopeOption } from "@/domains/chat/api/event-types.js";
import { toDisplayAttachments } from "@/domains/chat/api/event-parser.js";
import type { AllowlistOption, ChatMessageToolCall, DirectoryScopeOption, MessageCompleteEvent, ScopeOption } from "@/domains/chat/api/event-types.js";
import type { ToolActivityMetadata } from "@/assistant/web-activity-types.js";

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -116,7 +117,7 @@ export function appendTextDelta(
{
...last,
content: last.content + text,
id: messageId ?? last.id,
id: last.id ?? messageId,
textSegments: segments,
contentOrder: order,
},
Expand Down Expand Up @@ -157,70 +158,68 @@ export function finalizeOnIdle(prev: DisplayMessage[]): DisplayMessage[] {
// message_complete
// ---------------------------------------------------------------------------

/** Finalize a streaming message with its completed content and attachments. */
/**
* Apply a `message_complete` event to the message array.
*
* Decision is role-based on the tail:
* - tail is user (or array empty) → push a new finalized assistant bubble
* stamped with `event.messageId`. This covers the start-of-turn case
* where no streaming bubble was opened (e.g. tool-only or aux turns).
* - tail is assistant → finalize it: flip `isStreaming: false`, complete
* any running tool calls, merge in `event.content` / `event.attachments`,
* **keep `tail.id`**. Subsequent `message_complete` events from later
* LLM calls in the same agent turn fold into the same bubble — the
* mirror of the daemon's server-side merge which collapses to the first
* row's id.
*/
export function finalizeMessageComplete(
prev: DisplayMessage[],
opts: {
content?: string;
displayMessageId?: string;
attachments?: DisplayAttachment[];
},
event: MessageCompleteEvent,
): DisplayMessage[] {
const { content, displayMessageId, attachments } = opts;
const last = prev[prev.length - 1];
const attachments = toDisplayAttachments(event.attachments);

if (last?.role === "assistant" && last.isStreaming) {
const finalized = finalizeRunningToolCalls(last.toolCalls);
return [
...prev.slice(0, -1),
{
...last,
isStreaming: false,
id: displayMessageId ?? last.id,
content: content || last.content,
...(attachments ? { attachments } : {}),
...(finalized ? { toolCalls: finalized } : {}),
},
];
}

if (content || attachments) {
if (displayMessageId && prev.some((m) => m.id === displayMessageId)) {
return prev.map((m) =>
m.id === displayMessageId
? {
...m,
...(content ? { content } : {}),
...(attachments && !m.attachments ? { attachments } : {}),
}
: m,
);
}
if (last?.role !== "assistant") {
if (!event.content && !attachments) return prev;
return [
...prev,
{
stableId: newStableId("assistant-complete"),
id: displayMessageId,
id: event.messageId,
role: "assistant" as const,
content: content ?? "",
content: event.content ?? "",
timestamp: Date.now(),
...(attachments ? { attachments } : {}),
},
];
}

return prev;
const finalized = finalizeRunningToolCalls(last.toolCalls);
return [
...prev.slice(0, -1),
{
...last,
isStreaming: false,
// Keep `last.id` — the anchor was locked by the first text_delta /
// tool_use of the turn. The daemon may advance its internal row id
// across multiple LLM calls, but each call's `event.messageId` is
// just a constituent of this display row.
content: event.content || last.content,
...(attachments ? { attachments } : {}),
...(finalized ? { toolCalls: finalized } : {}),
},
];
}

// ---------------------------------------------------------------------------
// generation_handoff / stream stop
// ---------------------------------------------------------------------------

/** Stop streaming on the last assistant message (handoff or cancellation). */
export function stopStreaming(
prev: DisplayMessage[],
opts?: { displayMessageId?: string },
): DisplayMessage[] {
/**
* Stop streaming on the tail assistant bubble (handoff or cancellation).
* Keeps `tail.id` — same anchor preservation as `finalizeMessageComplete`.
*/
export function stopStreaming(prev: DisplayMessage[]): DisplayMessage[] {
const last = prev[prev.length - 1];
if (!last || last.role !== "assistant" || !last.isStreaming) return prev;

Expand All @@ -229,7 +228,6 @@ export function stopStreaming(
{
...last,
isStreaming: false,
...(opts?.displayMessageId ? { id: opts.displayMessageId } : {}),
},
];
}
Expand Down
Loading
Loading