Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions apps/web/src/domains/chat/chat-page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ export function ChatPage() {
const streamEpochRef = useRef(0);
const streamContextRef = useRef<{ assistantId: string; conversationId: string } | null>(null);
const reconcileAfterNextStreamOpenRef = useRef(false);
const needsNewBubbleRef = useRef(true);
const dismissedSurfaceIdsRef = useRef<Set<string>>(new Set());
const pendingOnboardingContextRef = useRef<PreChatOnboardingContext | null>(null);
const onboardingDraftConversationIdRef = useRef<string | null>(null);
Expand Down Expand Up @@ -381,11 +380,6 @@ export function ChatPage() {
);
const isChannelReadonly = isChannelConversation(activeConversation);

const syncNeedsNewBubbleFromMessages = useCallback((nextMessages: DisplayMessage[]) => {
const lastMsg = nextMessages[nextMessages.length - 1];
needsNewBubbleRef.current = !lastMsg || lastMsg.role !== "assistant" || !lastMsg.isStreaming;
}, []);

// -------------------------------------------------------------------------
// Conversation loader
// -------------------------------------------------------------------------
Expand Down Expand Up @@ -414,7 +408,6 @@ export function ChatPage() {
activeConversationIdRef,
contextWindowUsageByConversationRef,
dismissedSurfaceIdsRef,
needsNewBubbleRef,
streamingMessageIdsRef,
pendingQueuedStableIdsRef,
requestIdToStableIdRef,
Expand All @@ -434,7 +427,6 @@ export function ChatPage() {
setSuggestion,
setCompactionCircuitOpenUntil,
resetChatAttachments,
syncNeedsNewBubbleFromMessages,
shouldSuppressGenericChatErrorNotice,
});

Expand Down Expand Up @@ -567,7 +559,6 @@ export function ChatPage() {
assistantIdRef,
setMessages,
messagesRef,
needsNewBubbleRef,
setError,
streamRef,
cancelReconciliation,
Expand Down Expand Up @@ -610,7 +601,6 @@ export function ChatPage() {
streamRef,
streamContextRef,
streamEpochRef,
needsNewBubbleRef,
dismissedSurfaceIdsRef,
pendingOnboardingContextRef,
onboardingDraftConversationIdRef,
Expand Down
78 changes: 56 additions & 22 deletions apps/web/src/domains/chat/hooks/stream-message-updaters.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,35 @@ describe("appendTextDelta", () => {
expect(last.content).toBe("Hello");
});

it("returns prev unchanged if last message is not streaming assistant", () => {
const msg = makeAssistantMsg({ isStreaming: false });
const prev = [userMsg, msg];
const result = appendTextDelta(prev, "text");
expect(result).toBe(prev);
it("creates a new streaming bubble when last message is a finalized assistant", () => {
// Old behavior was a no-op; new behavior opens a fresh bubble so the
// text-delta isn't dropped on the floor. This is the bubble-creation
// path that used to be gated by `needsNewBubbleRef`.
const finalized = makeAssistantMsg({ isStreaming: false });
const result = appendTextDelta([userMsg, finalized], "text");

expect(result).toHaveLength(3);
expect(result[2]!.role).toBe("assistant");
expect(result[2]!.isStreaming).toBe(true);
expect(result[2]!.content).toBe("text");
});

it("returns prev unchanged if last message is a user message", () => {
const prev = [userMsg];
const result = appendTextDelta(prev, "text");
expect(result).toBe(prev);
it("creates a new streaming bubble when last message is a user message", () => {
// Initial assistant turn (no prior assistant bubble at all) — first
// text delta must spawn the bubble rather than no-op.
const result = appendTextDelta([userMsg], "text");

expect(result).toHaveLength(2);
expect(result[1]!.role).toBe("assistant");
expect(result[1]!.isStreaming).toBe(true);
expect(result[1]!.content).toBe("text");
});

it("uses stableId when creating a new bubble", () => {
const result = appendTextDelta([userMsg], "text", "stable-xyz");

expect(result).toHaveLength(2);
expect(result[1]!.id).toBe("stable-xyz");
});

it("does not mutate the original array", () => {
Expand Down Expand Up @@ -199,9 +217,9 @@ describe("upsertToolCall", () => {
status: "running" as const,
};

it("appends tool call to existing streaming message", () => {
it("appends tool call to existing streaming assistant tail", () => {
const msg = makeAssistantMsg({ toolCalls: undefined });
const result = upsertToolCall([userMsg, msg], toolCall, false);
const result = upsertToolCall([userMsg, msg], toolCall);

expect(result).toHaveLength(2);
expect(result[1]!.toolCalls).toHaveLength(1);
Expand All @@ -214,24 +232,26 @@ describe("upsertToolCall", () => {
toolCalls: [{ id: "tc-1", toolName: "old_name", input: {}, status: "running" as const }],
});
const updatedTc = { id: "tc-1", toolName: "web_search", input: {} as Record<string, unknown>, status: "running" as const };
const result = upsertToolCall([msg], updatedTc, false);
const result = upsertToolCall([msg], updatedTc);

expect(result[0]!.toolCalls).toHaveLength(1);
expect(result[0]!.toolCalls![0]!.toolName).toBe("web_search");
});

it("creates new bubble when shouldCreateNewBubble is true", () => {
const msg = makeAssistantMsg();
const result = upsertToolCall([userMsg, msg], toolCall, true);
it("creates a new bubble when the tail is a finalized assistant", () => {
// Finalized assistant tail (isStreaming: false) → derivation says
// "open a fresh bubble" rather than extend the previous turn.
const finalized = makeAssistantMsg({ isStreaming: false });
const result = upsertToolCall([userMsg, finalized], toolCall);

expect(result).toHaveLength(3);
expect(result[2]!.role).toBe("assistant");
expect(result[2]!.isStreaming).toBe(true);
expect(result[2]!.toolCalls).toHaveLength(1);
});

it("creates new bubble when no streaming assistant message exists", () => {
const result = upsertToolCall([userMsg], toolCall, false);
it("creates a new bubble when no streaming assistant tail exists", () => {
const result = upsertToolCall([userMsg], toolCall);

expect(result).toHaveLength(2);
expect(result[1]!.role).toBe("assistant");
Expand All @@ -241,7 +261,7 @@ describe("upsertToolCall", () => {
it("does not mutate existing messages", () => {
const msg = makeAssistantMsg({ toolCalls: [] });
const prev = [msg];
upsertToolCall(prev, toolCall, false);
upsertToolCall(prev, toolCall);
expect(prev[0]!.toolCalls).toHaveLength(0);
});
});
Expand Down Expand Up @@ -411,15 +431,29 @@ describe("finalizeOnIdle", () => {
expect(result).toBe(prev);
});

it("returns prev unchanged when streaming messages have no running tool calls", () => {
it("flips isStreaming to false even when streaming messages have no running tool calls", () => {
// New behavior (replaces what `needsNewBubbleRef` used to carry): the
// tail must transition out of "streaming" state on idle regardless of
// tool-call presence, so the next chunk derives "open a new bubble".
const msg = makeAssistantMsg({
toolCalls: [
{ id: "tc-1", toolName: "web_search", input: {}, status: "completed" },
],
});
const prev = [msg];
const result = finalizeOnIdle(prev);
expect(result).toBe(prev);
const result = finalizeOnIdle([msg]);

expect(result).toHaveLength(1);
expect(result[0]!.isStreaming).toBe(false);
// Already-completed tool calls remain untouched.
expect(result[0]!.toolCalls![0]!.status).toBe("completed");
});

it("flips isStreaming to false on a streaming assistant with no tool calls at all", () => {
const msg = makeAssistantMsg({ toolCalls: undefined });
const result = finalizeOnIdle([msg]);

expect(result).toHaveLength(1);
expect(result[0]!.isStreaming).toBe(false);
});

it("does not modify non-streaming assistant messages", () => {
Expand Down
75 changes: 59 additions & 16 deletions apps/web/src/domains/chat/hooks/stream-message-updaters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ export function finalizeRunningToolCalls(
);
}

/**
* Whether the next streaming chunk should extend the tail bubble or start
* a fresh one. Derived directly from the message array — the boundary
* events that previously latched this decision (idle, message_complete,
* generation_handoff, generation_cancelled, dequeued, conversation switch)
* all leave the tail in a state where `isStreaming` is either `false` or
* the tail is no longer an assistant row, so the derivation answers
* correctly without any shared flag.
*/
export function tailIsStreamingAssistant(prev: DisplayMessage[]): boolean {
const last = prev[prev.length - 1];
return !!last && last.role === "assistant" && !!last.isStreaming;
}

// ---------------------------------------------------------------------------
// assistant_text_delta
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -58,15 +72,28 @@ export function createStreamingBubble(
];
}

/** Append text to the last streaming assistant bubble. */
/**
* Append text to the streaming assistant tail bubble, creating one if the
* tail isn't a streaming assistant row.
*
* The "should I open a new bubble" question is derived from the message
* array itself — when the previous turn finalized (via `finalizeOnIdle`,
* `finalizeMessageComplete`, `stopStreaming`, conversation switch, or a
* user message append), the tail's `isStreaming` flag is already false
* (or the tail is no longer an assistant row), so this updater branches
* to `createStreamingBubble` without needing a shared latch.
*/
export function appendTextDelta(
prev: DisplayMessage[],
text: string,
messageId?: string,
stableId?: string,
): DisplayMessage[] {
const last = prev[prev.length - 1];
if (!last || last.role !== "assistant" || !last.isStreaming) return prev;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our conversation focused on simplifying client state, let's add an item to our list for removing isStreaming from messages.

Messages don't have streaming state. they are just the packets of data. the only thing that has streaming state is the assistant itself on any given conversation

if (!tailIsStreamingAssistant(prev)) {
return createStreamingBubble(prev, text, messageId, stableId);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our conversation focused on simplifying client state, let's add an item to our list for removing Bubble terminology.

  1. we don't even use chat bubbles anymore for our assistant messages
  2. even if we did, that is a rendering concern and not a state management concern

}

const last = prev[prev.length - 1]!;
const segments = [...(last.textSegments ?? [])];
const order = [...(last.contentOrder ?? [])];
const lastOrderEntry = order[order.length - 1];
Expand Down Expand Up @@ -102,14 +129,28 @@ export function appendTextDelta(
// assistant_activity_state (idle)
// ---------------------------------------------------------------------------

/** Finalize running tool calls on ALL streaming assistant messages (idle signal). */
/**
* Finalize all streaming assistant messages when the daemon signals turn
* idle. Sets `isStreaming: false` on each streaming assistant row and
* marks any running tool calls as completed.
*
* Flipping `isStreaming` here is what lets `appendTextDelta` /
* `upsertToolCall` derive "next chunk should open a new bubble" from the
* message array alone — without this, the previous turn's tail would
* still look like a streaming assistant when the next turn's first chunk
* arrives, and the next chunk would erroneously extend it.
*/
export function finalizeOnIdle(prev: DisplayMessage[]): DisplayMessage[] {
let changed = false;
const updated = prev.map((m) => {
if (m.role !== "assistant" || !m.isStreaming) return m;
if (!m.toolCalls?.some((tc) => tc.status === "running")) return m;
changed = true;
return { ...m, toolCalls: finalizeRunningToolCalls(m.toolCalls)! };
const finalized = finalizeRunningToolCalls(m.toolCalls);
return {
...m,
isStreaming: false,
...(finalized ? { toolCalls: finalized } : {}),
};
});
return changed ? updated : prev;
}
Expand Down Expand Up @@ -389,21 +430,23 @@ export function completeSurface(
// tool_use_start
// ---------------------------------------------------------------------------

/** Insert or update a tool call on an assistant message. */
/**
* Insert or update a tool call on the streaming assistant tail bubble,
* creating a new bubble if the tail isn't a streaming assistant row.
*
* The bubble-creation decision is derived from `prev` itself — no shared
* latch passes through. Same finalization invariant as `appendTextDelta`:
* boundary events leave the tail with `isStreaming: false` (or non-
* assistant), so this updater opens a fresh bubble correctly.
*/
export function upsertToolCall(
prev: DisplayMessage[],
toolCall: ChatMessageToolCall,
shouldCreateNewBubble: boolean,
stableId?: string,
): DisplayMessage[] {
const lastIdx = prev.length - 1;
const last = prev[lastIdx];

if (
!shouldCreateNewBubble &&
last?.role === "assistant" &&
last.isStreaming
) {
if (tailIsStreamingAssistant(prev)) {
const lastIdx = prev.length - 1;
const last = prev[lastIdx]!;
const existingIdx =
last.toolCalls?.findIndex((tc) => tc.id === toolCall.id) ?? -1;
if (existingIdx !== -1) {
Expand Down
7 changes: 0 additions & 7 deletions apps/web/src/domains/chat/hooks/use-conversation-history.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ interface UseConversationHistoryParams {

contextWindowUsageByConversationRef: MutableRefObject<Map<string, ContextWindowUsage>>;
dismissedSurfaceIdsRef: MutableRefObject<Set<string>>;
needsNewBubbleRef: MutableRefObject<boolean>;
streamingMessageIdsRef: MutableRefObject<Set<string>>;
pendingQueuedStableIdsRef: MutableRefObject<string[]>;
requestIdToStableIdRef: MutableRefObject<Map<string, string>>;
Expand All @@ -93,7 +92,6 @@ interface UseConversationHistoryParams {

// Callbacks
resetChatAttachments: () => void;
syncNeedsNewBubbleFromMessages: (nextMessages: DisplayMessage[]) => void;

// Error classification
shouldSuppressGenericChatErrorNotice: (prev: ChatError | null) => boolean;
Expand All @@ -115,7 +113,6 @@ export function useConversationHistory({
previousConversationIdRef,
contextWindowUsageByConversationRef,
dismissedSurfaceIdsRef,
needsNewBubbleRef,
streamingMessageIdsRef,
pendingQueuedStableIdsRef,
requestIdToStableIdRef,
Expand All @@ -132,7 +129,6 @@ export function useConversationHistory({
setSuggestion,
setCompactionCircuitOpenUntil,
resetChatAttachments,
syncNeedsNewBubbleFromMessages,
shouldSuppressGenericChatErrorNotice,
}: UseConversationHistoryParams): ConversationHistoryResult {
// -------------------------------------------------------------------------
Expand All @@ -156,7 +152,6 @@ export function useConversationHistory({
activeConversationId,
draftConversationIdResolutionRef,
previousConversationIdRef,
needsNewBubbleRef,
streamingMessageIdsRef,
pendingQueuedStableIdsRef,
requestIdToStableIdRef,
Expand Down Expand Up @@ -224,7 +219,6 @@ export function useConversationHistory({
prev,
filteredMessages,
);
syncNeedsNewBubbleFromMessages(nextMessages);
return nextMessages;
});
setTranscriptPagination({
Expand Down Expand Up @@ -369,7 +363,6 @@ export function useConversationHistory({
activeConversationId,
dismissedSurfaceIdsRef,
autoGreetRef,
syncNeedsNewBubbleFromMessages,
setMessages,
setTranscriptPagination,
setIsLoadingHistory,
Expand Down
6 changes: 2 additions & 4 deletions apps/web/src/domains/chat/hooks/use-conversation-switch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ export interface UseConversationSwitchParams {
// Refs owned by the parent that the reset clears or refreshes.
draftConversationIdResolutionRef: MutableRefObject<boolean>;
previousConversationIdRef: MutableRefObject<string | null>;
needsNewBubbleRef: MutableRefObject<boolean>;
streamingMessageIdsRef: MutableRefObject<Set<string>>;
pendingQueuedStableIdsRef: MutableRefObject<string[]>;
requestIdToStableIdRef: MutableRefObject<Map<string, string>>;
Expand Down Expand Up @@ -87,7 +86,6 @@ export function useConversationSwitch({
activeConversationId,
draftConversationIdResolutionRef,
previousConversationIdRef,
needsNewBubbleRef,
streamingMessageIdsRef,
pendingQueuedStableIdsRef,
requestIdToStableIdRef,
Expand Down Expand Up @@ -143,7 +141,8 @@ export function useConversationSwitch({
// Reset all per-conversation state so nothing leaks between threads.
useTurnStore.getState().resetTurn();
setIsLoadingHistory(true);
needsNewBubbleRef.current = true;
// `setMessages([])` makes the tail derivation return "create new bubble"
// for any subsequent stream event — no separate latch needed.
Comment on lines +144 to +145

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete this comment

setMessages([]);
streamingMessageIdsRef.current.clear();
pendingQueuedStableIdsRef.current = [];
Expand Down Expand Up @@ -187,7 +186,6 @@ export function useConversationSwitch({
previousConversationIdRef,
contextWindowUsageByConversationRef,
dismissedSurfaceIdsRef,
needsNewBubbleRef,
streamingMessageIdsRef,
pendingQueuedStableIdsRef,
requestIdToStableIdRef,
Expand Down
Loading