diff --git a/apps/web/src/domains/chat/chat-page.tsx b/apps/web/src/domains/chat/chat-page.tsx index 98cc29265e..03f83d72a4 100644 --- a/apps/web/src/domains/chat/chat-page.tsx +++ b/apps/web/src/domains/chat/chat-page.tsx @@ -257,7 +257,6 @@ export function ChatPage() { const streamEpochRef = useRef(0); const streamContextRef = useRef<{ assistantId: string; conversationId: string } | null>(null); const reconcileAfterNextStreamOpenRef = useRef(false); - const needsNewBubbleRef = useRef(true); const dismissedSurfaceIdsRef = useRef>(new Set()); const pendingOnboardingContextRef = useRef(null); const onboardingDraftConversationIdRef = useRef(null); @@ -381,11 +380,6 @@ export function ChatPage() { ); const isChannelReadonly = isChannelConversation(activeConversation); - const syncNeedsNewBubbleFromMessages = useCallback((nextMessages: DisplayMessage[]) => { - const lastMsg = nextMessages[nextMessages.length - 1]; - needsNewBubbleRef.current = !lastMsg || lastMsg.role !== "assistant" || !lastMsg.isStreaming; - }, []); - // ------------------------------------------------------------------------- // Conversation loader // ------------------------------------------------------------------------- @@ -414,7 +408,6 @@ export function ChatPage() { activeConversationIdRef, contextWindowUsageByConversationRef, dismissedSurfaceIdsRef, - needsNewBubbleRef, streamingMessageIdsRef, pendingQueuedStableIdsRef, requestIdToStableIdRef, @@ -434,7 +427,6 @@ export function ChatPage() { setSuggestion, setCompactionCircuitOpenUntil, resetChatAttachments, - syncNeedsNewBubbleFromMessages, shouldSuppressGenericChatErrorNotice, }); @@ -567,7 +559,6 @@ export function ChatPage() { assistantIdRef, setMessages, messagesRef, - needsNewBubbleRef, setError, streamRef, cancelReconciliation, @@ -610,7 +601,6 @@ export function ChatPage() { streamRef, streamContextRef, streamEpochRef, - needsNewBubbleRef, dismissedSurfaceIdsRef, pendingOnboardingContextRef, onboardingDraftConversationIdRef, diff --git a/apps/web/src/domains/chat/hooks/stream-message-updaters.test.ts b/apps/web/src/domains/chat/hooks/stream-message-updaters.test.ts index 699edbe853..1475279512 100644 --- a/apps/web/src/domains/chat/hooks/stream-message-updaters.test.ts +++ b/apps/web/src/domains/chat/hooks/stream-message-updaters.test.ts @@ -86,17 +86,35 @@ describe("appendTextDelta", () => { expect(last.content).toBe("Hello"); }); - it("returns prev unchanged if last message is not streaming assistant", () => { - const msg = makeAssistantMsg({ isStreaming: false }); - const prev = [userMsg, msg]; - const result = appendTextDelta(prev, "text"); - expect(result).toBe(prev); + it("creates a new streaming bubble when last message is a finalized assistant", () => { + // Old behavior was a no-op; new behavior opens a fresh bubble so the + // text-delta isn't dropped on the floor. This is the bubble-creation + // path that used to be gated by `needsNewBubbleRef`. + const finalized = makeAssistantMsg({ isStreaming: false }); + const result = appendTextDelta([userMsg, finalized], "text"); + + expect(result).toHaveLength(3); + expect(result[2]!.role).toBe("assistant"); + expect(result[2]!.isStreaming).toBe(true); + expect(result[2]!.content).toBe("text"); }); - it("returns prev unchanged if last message is a user message", () => { - const prev = [userMsg]; - const result = appendTextDelta(prev, "text"); - expect(result).toBe(prev); + it("creates a new streaming bubble when last message is a user message", () => { + // Initial assistant turn (no prior assistant bubble at all) — first + // text delta must spawn the bubble rather than no-op. + const result = appendTextDelta([userMsg], "text"); + + expect(result).toHaveLength(2); + expect(result[1]!.role).toBe("assistant"); + expect(result[1]!.isStreaming).toBe(true); + expect(result[1]!.content).toBe("text"); + }); + + it("uses stableId when creating a new bubble", () => { + const result = appendTextDelta([userMsg], "text", "stable-xyz"); + + expect(result).toHaveLength(2); + expect(result[1]!.id).toBe("stable-xyz"); }); it("does not mutate the original array", () => { @@ -199,9 +217,9 @@ describe("upsertToolCall", () => { status: "running" as const, }; - it("appends tool call to existing streaming message", () => { + it("appends tool call to existing streaming assistant tail", () => { const msg = makeAssistantMsg({ toolCalls: undefined }); - const result = upsertToolCall([userMsg, msg], toolCall, false); + const result = upsertToolCall([userMsg, msg], toolCall); expect(result).toHaveLength(2); expect(result[1]!.toolCalls).toHaveLength(1); @@ -214,15 +232,17 @@ describe("upsertToolCall", () => { toolCalls: [{ id: "tc-1", toolName: "old_name", input: {}, status: "running" as const }], }); const updatedTc = { id: "tc-1", toolName: "web_search", input: {} as Record, status: "running" as const }; - const result = upsertToolCall([msg], updatedTc, false); + const result = upsertToolCall([msg], updatedTc); expect(result[0]!.toolCalls).toHaveLength(1); expect(result[0]!.toolCalls![0]!.toolName).toBe("web_search"); }); - it("creates new bubble when shouldCreateNewBubble is true", () => { - const msg = makeAssistantMsg(); - const result = upsertToolCall([userMsg, msg], toolCall, true); + it("creates a new bubble when the tail is a finalized assistant", () => { + // Finalized assistant tail (isStreaming: false) → derivation says + // "open a fresh bubble" rather than extend the previous turn. + const finalized = makeAssistantMsg({ isStreaming: false }); + const result = upsertToolCall([userMsg, finalized], toolCall); expect(result).toHaveLength(3); expect(result[2]!.role).toBe("assistant"); @@ -230,8 +250,8 @@ describe("upsertToolCall", () => { expect(result[2]!.toolCalls).toHaveLength(1); }); - it("creates new bubble when no streaming assistant message exists", () => { - const result = upsertToolCall([userMsg], toolCall, false); + it("creates a new bubble when no streaming assistant tail exists", () => { + const result = upsertToolCall([userMsg], toolCall); expect(result).toHaveLength(2); expect(result[1]!.role).toBe("assistant"); @@ -241,7 +261,7 @@ describe("upsertToolCall", () => { it("does not mutate existing messages", () => { const msg = makeAssistantMsg({ toolCalls: [] }); const prev = [msg]; - upsertToolCall(prev, toolCall, false); + upsertToolCall(prev, toolCall); expect(prev[0]!.toolCalls).toHaveLength(0); }); }); @@ -411,15 +431,29 @@ describe("finalizeOnIdle", () => { expect(result).toBe(prev); }); - it("returns prev unchanged when streaming messages have no running tool calls", () => { + it("flips isStreaming to false even when streaming messages have no running tool calls", () => { + // New behavior (replaces what `needsNewBubbleRef` used to carry): the + // tail must transition out of "streaming" state on idle regardless of + // tool-call presence, so the next chunk derives "open a new bubble". const msg = makeAssistantMsg({ toolCalls: [ { id: "tc-1", toolName: "web_search", input: {}, status: "completed" }, ], }); - const prev = [msg]; - const result = finalizeOnIdle(prev); - expect(result).toBe(prev); + const result = finalizeOnIdle([msg]); + + expect(result).toHaveLength(1); + expect(result[0]!.isStreaming).toBe(false); + // Already-completed tool calls remain untouched. + expect(result[0]!.toolCalls![0]!.status).toBe("completed"); + }); + + it("flips isStreaming to false on a streaming assistant with no tool calls at all", () => { + const msg = makeAssistantMsg({ toolCalls: undefined }); + const result = finalizeOnIdle([msg]); + + expect(result).toHaveLength(1); + expect(result[0]!.isStreaming).toBe(false); }); it("does not modify non-streaming assistant messages", () => { diff --git a/apps/web/src/domains/chat/hooks/stream-message-updaters.ts b/apps/web/src/domains/chat/hooks/stream-message-updaters.ts index 4c3c856c79..8ba790e5c0 100644 --- a/apps/web/src/domains/chat/hooks/stream-message-updaters.ts +++ b/apps/web/src/domains/chat/hooks/stream-message-updaters.ts @@ -31,6 +31,20 @@ export function finalizeRunningToolCalls( ); } +/** + * Whether the next streaming chunk should extend the tail bubble or start + * a fresh one. Derived directly from the message array — the boundary + * events that previously latched this decision (idle, message_complete, + * generation_handoff, generation_cancelled, dequeued, conversation switch) + * all leave the tail in a state where `isStreaming` is either `false` or + * the tail is no longer an assistant row, so the derivation answers + * correctly without any shared flag. + */ +export function tailIsStreamingAssistant(prev: DisplayMessage[]): boolean { + const last = prev[prev.length - 1]; + return !!last && last.role === "assistant" && !!last.isStreaming; +} + // --------------------------------------------------------------------------- // assistant_text_delta // --------------------------------------------------------------------------- @@ -58,15 +72,28 @@ export function createStreamingBubble( ]; } -/** Append text to the last streaming assistant bubble. */ +/** + * Append text to the streaming assistant tail bubble, creating one if the + * tail isn't a streaming assistant row. + * + * The "should I open a new bubble" question is derived from the message + * array itself — when the previous turn finalized (via `finalizeOnIdle`, + * `finalizeMessageComplete`, `stopStreaming`, conversation switch, or a + * user message append), the tail's `isStreaming` flag is already false + * (or the tail is no longer an assistant row), so this updater branches + * to `createStreamingBubble` without needing a shared latch. + */ export function appendTextDelta( prev: DisplayMessage[], text: string, messageId?: string, + stableId?: string, ): DisplayMessage[] { - const last = prev[prev.length - 1]; - if (!last || last.role !== "assistant" || !last.isStreaming) return prev; + if (!tailIsStreamingAssistant(prev)) { + return createStreamingBubble(prev, text, messageId, stableId); + } + const last = prev[prev.length - 1]!; const segments = [...(last.textSegments ?? [])]; const order = [...(last.contentOrder ?? [])]; const lastOrderEntry = order[order.length - 1]; @@ -102,14 +129,28 @@ export function appendTextDelta( // assistant_activity_state (idle) // --------------------------------------------------------------------------- -/** Finalize running tool calls on ALL streaming assistant messages (idle signal). */ +/** + * Finalize all streaming assistant messages when the daemon signals turn + * idle. Sets `isStreaming: false` on each streaming assistant row and + * marks any running tool calls as completed. + * + * Flipping `isStreaming` here is what lets `appendTextDelta` / + * `upsertToolCall` derive "next chunk should open a new bubble" from the + * message array alone — without this, the previous turn's tail would + * still look like a streaming assistant when the next turn's first chunk + * arrives, and the next chunk would erroneously extend it. + */ export function finalizeOnIdle(prev: DisplayMessage[]): DisplayMessage[] { let changed = false; const updated = prev.map((m) => { if (m.role !== "assistant" || !m.isStreaming) return m; - if (!m.toolCalls?.some((tc) => tc.status === "running")) return m; changed = true; - return { ...m, toolCalls: finalizeRunningToolCalls(m.toolCalls)! }; + const finalized = finalizeRunningToolCalls(m.toolCalls); + return { + ...m, + isStreaming: false, + ...(finalized ? { toolCalls: finalized } : {}), + }; }); return changed ? updated : prev; } @@ -389,21 +430,23 @@ export function completeSurface( // tool_use_start // --------------------------------------------------------------------------- -/** Insert or update a tool call on an assistant message. */ +/** + * Insert or update a tool call on the streaming assistant tail bubble, + * creating a new bubble if the tail isn't a streaming assistant row. + * + * The bubble-creation decision is derived from `prev` itself — no shared + * latch passes through. Same finalization invariant as `appendTextDelta`: + * boundary events leave the tail with `isStreaming: false` (or non- + * assistant), so this updater opens a fresh bubble correctly. + */ export function upsertToolCall( prev: DisplayMessage[], toolCall: ChatMessageToolCall, - shouldCreateNewBubble: boolean, stableId?: string, ): DisplayMessage[] { - const lastIdx = prev.length - 1; - const last = prev[lastIdx]; - - if ( - !shouldCreateNewBubble && - last?.role === "assistant" && - last.isStreaming - ) { + if (tailIsStreamingAssistant(prev)) { + const lastIdx = prev.length - 1; + const last = prev[lastIdx]!; const existingIdx = last.toolCalls?.findIndex((tc) => tc.id === toolCall.id) ?? -1; if (existingIdx !== -1) { diff --git a/apps/web/src/domains/chat/hooks/use-conversation-history.ts b/apps/web/src/domains/chat/hooks/use-conversation-history.ts index 09f2579463..b2195dcf10 100644 --- a/apps/web/src/domains/chat/hooks/use-conversation-history.ts +++ b/apps/web/src/domains/chat/hooks/use-conversation-history.ts @@ -72,7 +72,6 @@ interface UseConversationHistoryParams { contextWindowUsageByConversationRef: MutableRefObject>; dismissedSurfaceIdsRef: MutableRefObject>; - needsNewBubbleRef: MutableRefObject; streamingMessageIdsRef: MutableRefObject>; pendingQueuedStableIdsRef: MutableRefObject; requestIdToStableIdRef: MutableRefObject>; @@ -93,7 +92,6 @@ interface UseConversationHistoryParams { // Callbacks resetChatAttachments: () => void; - syncNeedsNewBubbleFromMessages: (nextMessages: DisplayMessage[]) => void; // Error classification shouldSuppressGenericChatErrorNotice: (prev: ChatError | null) => boolean; @@ -115,7 +113,6 @@ export function useConversationHistory({ previousConversationIdRef, contextWindowUsageByConversationRef, dismissedSurfaceIdsRef, - needsNewBubbleRef, streamingMessageIdsRef, pendingQueuedStableIdsRef, requestIdToStableIdRef, @@ -132,7 +129,6 @@ export function useConversationHistory({ setSuggestion, setCompactionCircuitOpenUntil, resetChatAttachments, - syncNeedsNewBubbleFromMessages, shouldSuppressGenericChatErrorNotice, }: UseConversationHistoryParams): ConversationHistoryResult { // ------------------------------------------------------------------------- @@ -156,7 +152,6 @@ export function useConversationHistory({ activeConversationId, draftConversationIdResolutionRef, previousConversationIdRef, - needsNewBubbleRef, streamingMessageIdsRef, pendingQueuedStableIdsRef, requestIdToStableIdRef, @@ -224,7 +219,6 @@ export function useConversationHistory({ prev, filteredMessages, ); - syncNeedsNewBubbleFromMessages(nextMessages); return nextMessages; }); setTranscriptPagination({ @@ -369,7 +363,6 @@ export function useConversationHistory({ activeConversationId, dismissedSurfaceIdsRef, autoGreetRef, - syncNeedsNewBubbleFromMessages, setMessages, setTranscriptPagination, setIsLoadingHistory, diff --git a/apps/web/src/domains/chat/hooks/use-conversation-switch.ts b/apps/web/src/domains/chat/hooks/use-conversation-switch.ts index e524a8f05f..181fb51a79 100644 --- a/apps/web/src/domains/chat/hooks/use-conversation-switch.ts +++ b/apps/web/src/domains/chat/hooks/use-conversation-switch.ts @@ -40,7 +40,6 @@ export interface UseConversationSwitchParams { // Refs owned by the parent that the reset clears or refreshes. draftConversationIdResolutionRef: MutableRefObject; previousConversationIdRef: MutableRefObject; - needsNewBubbleRef: MutableRefObject; streamingMessageIdsRef: MutableRefObject>; pendingQueuedStableIdsRef: MutableRefObject; requestIdToStableIdRef: MutableRefObject>; @@ -87,7 +86,6 @@ export function useConversationSwitch({ activeConversationId, draftConversationIdResolutionRef, previousConversationIdRef, - needsNewBubbleRef, streamingMessageIdsRef, pendingQueuedStableIdsRef, requestIdToStableIdRef, @@ -143,7 +141,8 @@ export function useConversationSwitch({ // Reset all per-conversation state so nothing leaks between threads. useTurnStore.getState().resetTurn(); setIsLoadingHistory(true); - needsNewBubbleRef.current = true; + // `setMessages([])` makes the tail derivation return "create new bubble" + // for any subsequent stream event — no separate latch needed. setMessages([]); streamingMessageIdsRef.current.clear(); pendingQueuedStableIdsRef.current = []; @@ -187,7 +186,6 @@ export function useConversationSwitch({ previousConversationIdRef, contextWindowUsageByConversationRef, dismissedSurfaceIdsRef, - needsNewBubbleRef, streamingMessageIdsRef, pendingQueuedStableIdsRef, requestIdToStableIdRef, diff --git a/apps/web/src/domains/chat/hooks/use-send-message.ts b/apps/web/src/domains/chat/hooks/use-send-message.ts index b2c9300003..3f01ae8310 100644 --- a/apps/web/src/domains/chat/hooks/use-send-message.ts +++ b/apps/web/src/domains/chat/hooks/use-send-message.ts @@ -98,7 +98,6 @@ interface UseSendMessageParams { conversationId: string; } | null>; streamEpochRef: MutableRefObject; - needsNewBubbleRef: MutableRefObject; dismissedSurfaceIdsRef: MutableRefObject>; pendingOnboardingContextRef: MutableRefObject; onboardingDraftConversationIdRef: MutableRefObject; @@ -138,7 +137,6 @@ export function useSendMessage({ streamRef, streamContextRef, streamEpochRef, - needsNewBubbleRef, dismissedSurfaceIdsRef, pendingOnboardingContextRef, onboardingDraftConversationIdRef, @@ -490,7 +488,6 @@ export function useSendMessage({ setMessages((prev) => clearQueueStatus(prev, userMessage.stableId), ); - needsNewBubbleRef.current = true; const fallbackTurnId = newTurnId(); useTurnStore.getState().requestSend(fallbackTurnId); useTurnStore.getState().acceptSend(fallbackTurnId); @@ -541,7 +538,6 @@ export function useSendMessage({ } cancelReconciliation(); - needsNewBubbleRef.current = true; const isDraft = !currentConv; let resolvedId: string | undefined; @@ -608,7 +604,6 @@ export function useSendMessage({ streamEpochRef.current++; useTurnStore.getState().cancelGeneration(); setMessages(stopStreamingAndClearConfirmations); - needsNewBubbleRef.current = true; useInteractionStore.getState().resetAll(); useSubagentStore.getState().reset(); confirmationToolCallMapRef.current.clear(); diff --git a/apps/web/src/domains/chat/hooks/use-stream-event-handler-guard.test.tsx b/apps/web/src/domains/chat/hooks/use-stream-event-handler-guard.test.tsx index bab3fc5bf5..e97b3faf9d 100644 --- a/apps/web/src/domains/chat/hooks/use-stream-event-handler-guard.test.tsx +++ b/apps/web/src/domains/chat/hooks/use-stream-event-handler-guard.test.tsx @@ -55,7 +55,6 @@ function noopRefs() { } as MutableRefObject<{ assistantId: string; conversationId: string } | null>, assistantIdRef: { current: "asst-1" } as MutableRefObject, messagesRef: { current: [] } as MutableRefObject, - needsNewBubbleRef: { current: false } as MutableRefObject, streamRef: { current: null } as MutableRefObject, confirmationToolCallMapRef: { current: new Map() } as MutableRefObject< Map diff --git a/apps/web/src/domains/chat/hooks/use-stream-event-handler.ts b/apps/web/src/domains/chat/hooks/use-stream-event-handler.ts index b85f0a34fb..2fa7eea751 100644 --- a/apps/web/src/domains/chat/hooks/use-stream-event-handler.ts +++ b/apps/web/src/domains/chat/hooks/use-stream-event-handler.ts @@ -112,7 +112,6 @@ export interface UseStreamEventHandlerParams { // --- Messages --- setMessages: Dispatch>; messagesRef: MutableRefObject; - needsNewBubbleRef: MutableRefObject; // --- Error & stream lifecycle --- setError: Dispatch>; @@ -188,7 +187,6 @@ export function useStreamEventHandler( assistantIdRef, setMessages, messagesRef, - needsNewBubbleRef, setError, streamRef, cancelReconciliation, @@ -277,10 +275,15 @@ export function useStreamEventHandler( return; } } - if ( - event.type !== "assistant_text_delta" || - needsNewBubbleRef.current - ) { + // Suppress per-chunk text_delta noise — only log the first delta of a + // new bubble. "First delta" is derived from the message ref tail: + // if the tail isn't a streaming assistant, the next text_delta will + // open a fresh bubble. This replaces the previous `needsNewBubbleRef` + // latch with a tail-derivation read. + const tail = messagesRef.current[messagesRef.current.length - 1]; + const tailIsStreaming = + !!tail && tail.role === "assistant" && !!tail.isStreaming; + if (event.type !== "assistant_text_delta" || !tailIsStreaming) { recordChatDiagnostic( event.type === "assistant_text_delta" ? "sse_assistant_text_delta_start" @@ -303,7 +306,6 @@ export function useStreamEventHandler( assistantIdRef, setMessages, messagesRef, - needsNewBubbleRef, turnActions: useTurnStore.getState(), getTurnState: () => useTurnStore.getState(), clearProcessingKey, @@ -496,7 +498,6 @@ export function useStreamEventHandler( assistantIdRef, setMessages, messagesRef, - needsNewBubbleRef, setError, streamRef, confirmationToolCallMapRef, diff --git a/apps/web/src/domains/chat/utils/reconcile.test.ts b/apps/web/src/domains/chat/utils/reconcile.test.ts index d9fc497d58..1d4431df55 100644 --- a/apps/web/src/domains/chat/utils/reconcile.test.ts +++ b/apps/web/src/domains/chat/utils/reconcile.test.ts @@ -505,8 +505,8 @@ describe("reconcileMessages", () => { // THEN the streaming assistant message is preserved AS-IS, including its // `isStreaming` flag. A reconcile that lands while the turn is still // streaming must not flip the live bubble to "completed" — that would - // cause downstream consumers (e.g. `syncNeedsNewBubbleFromMessages`) to - // spawn a fresh bubble on the next tool_use_start, splitting the turn. + // make the bubble-creation tail derivation in stream-message-updaters + // open a fresh bubble on the next tool_use_start, splitting the turn. expect(result).toHaveLength(2); expect(result[1]).toMatchObject({ role: "assistant", @@ -843,11 +843,11 @@ describe("reconcileMessages — mid-stream sync-tag bubble-split regression", () // was preserved, but the streaming flag was lost — producing the // exact bug fingerprint: `isStreaming:false` + tool call // `status:"running"` on the same row. - // 5. `syncNeedsNewBubbleFromMessages` then flipped - // `needsNewBubbleRef = true` based on `lastMsg.isStreaming` alone, - // so the next `tool_use_start` event spawned a fresh - // `assistant-tool-*` bubble — splitting the turn and injecting the - // timestamp footer between the two halves. + // 5. The bubble-creation tail derivation in stream-message-updaters + // then saw `lastMsg.isStreaming === false` and (correctly, given + // the corrupted state) opened a fresh `assistant-tool-*` bubble on + // the next `tool_use_start` event — splitting the turn and + // injecting the timestamp footer between the two halves. // // Contract this test pins down: `reconcileMessages` MUST preserve the // client-owned `isStreaming` flag. SSE `message_complete` and the diff --git a/apps/web/src/domains/chat/utils/stream-handlers/message-handlers.test.ts b/apps/web/src/domains/chat/utils/stream-handlers/message-handlers.test.ts index 692d3e0b73..eb971421d7 100644 --- a/apps/web/src/domains/chat/utils/stream-handlers/message-handlers.test.ts +++ b/apps/web/src/domains/chat/utils/stream-handlers/message-handlers.test.ts @@ -21,14 +21,25 @@ describe("handleAssistantTextDelta", () => { expect(ctx.setMessages).toHaveBeenCalled(); }); - it("creates a new bubble when needsNewBubbleRef is true", () => { - const ctx = makeCtx({ needsNewBubbleRef: { current: true } }); + it("creates a new bubble when the tail is not a streaming assistant", () => { + // Empty messages → tail derivation says "create new bubble". + const ctx = makeCtx(); handleAssistantTextDelta( { type: "assistant_text_delta", text: "Hi" }, ctx, ); - expect(ctx.needsNewBubbleRef.current).toBe(false); expect(ctx.setMessages).toHaveBeenCalled(); + // Apply the updater to an empty array to confirm a new bubble emerges. + const updater = (ctx.setMessages as unknown as ReturnType).mock.calls[0][0] as ( + prev: never[], + ) => unknown[]; + const next = updater([]); + expect(next).toHaveLength(1); + expect(next[0]).toMatchObject({ + role: "assistant", + isStreaming: true, + content: "Hi", + }); }); }); @@ -68,7 +79,6 @@ describe("handleAssistantActivityState", () => { ); expect(ctx.lastActivityVersionRef.current.get("conv-1")).toBe(1); expect(ctx.setMessages).toHaveBeenCalled(); - expect(ctx.needsNewBubbleRef.current).toBe(true); expect(ctx.turnActions.completeTurn).toHaveBeenCalled(); expect(ctx.clearProcessingKey).toHaveBeenCalledWith("conv-1"); expect(ctx.startReconciliationLoop).toHaveBeenCalledWith(1); @@ -142,7 +152,6 @@ describe("handleMessageComplete", () => { ctx, ); expect(ctx.setMessages).toHaveBeenCalled(); - expect(ctx.needsNewBubbleRef.current).toBe(true); expect(ctx.turnActions.completeTurn).toHaveBeenCalled(); expect(ctx.clearProcessingKey).toHaveBeenCalledWith("conv-1"); expect(ctx.startReconciliationLoop).toHaveBeenCalledWith(1); @@ -150,7 +159,7 @@ describe("handleMessageComplete", () => { }); describe("handleGenerationHandoff", () => { - it("cancels reconciliation and sets needsNewBubble", () => { + it("cancels reconciliation and finalizes streaming tail", () => { const ctx = makeCtx(); handleGenerationHandoff( { type: "generation_handoff", messageId: "msg-1" }, @@ -158,7 +167,7 @@ describe("handleGenerationHandoff", () => { ); expect(ctx.cancelReconciliation).toHaveBeenCalled(); expect(ctx.turnActions.handoffGeneration).toHaveBeenCalled(); - expect(ctx.needsNewBubbleRef.current).toBe(true); + expect(ctx.setMessages).toHaveBeenCalled(); }); }); @@ -169,6 +178,5 @@ describe("handleGenerationCancelled", () => { expect(ctx.turnActions.cancelGeneration).toHaveBeenCalled(); expect(ctx.clearProcessingKey).toHaveBeenCalledWith("conv-1"); expect(ctx.setMessages).toHaveBeenCalled(); - expect(ctx.needsNewBubbleRef.current).toBe(true); }); }); diff --git a/apps/web/src/domains/chat/utils/stream-handlers/message-handlers.ts b/apps/web/src/domains/chat/utils/stream-handlers/message-handlers.ts index 226c35c9ab..8b8015bed3 100644 --- a/apps/web/src/domains/chat/utils/stream-handlers/message-handlers.ts +++ b/apps/web/src/domains/chat/utils/stream-handlers/message-handlers.ts @@ -1,8 +1,6 @@ import { recordChatDiagnostic } from "@/domains/chat/utils/diagnostics.js"; -import { newStableId } from "@/domains/chat/utils/stable-id.js"; import { appendTextDelta, - createStreamingBubble, finalizeMessageComplete, finalizeOnIdle, stopStreaming, @@ -17,18 +15,17 @@ export function handleAssistantTextDelta( ): void { ctx.cancelReconciliation(); ctx.turnActions.onTextDelta(); - if (ctx.needsNewBubbleRef.current) { - ctx.needsNewBubbleRef.current = false; - const stableId = newStableId("assistant-stream"); - ctx.currentAssistantStableIdRef.current = stableId; - ctx.setMessages((prev) => - createStreamingBubble(prev, event.text, event.messageId, stableId), - ); - } else { - ctx.setMessages((prev) => - appendTextDelta(prev, event.text, event.messageId), - ); - } + ctx.setMessages((prev) => { + const next = appendTextDelta(prev, event.text, event.messageId); + const tail = next[next.length - 1]; + // Stamp the stable-id ref to the streaming tail (no-op for extends; new + // id for creates). Subagent handlers read this to attribute nested + // notifications to the right parent bubble. + if (tail?.role === "assistant" && tail.isStreaming) { + ctx.currentAssistantStableIdRef.current = tail.stableId; + } + return next; + }); } export function handleAssistantActivityState( @@ -75,7 +72,6 @@ export function handleAssistantActivityState( } ctx.setMessages(finalizeOnIdle); - ctx.needsNewBubbleRef.current = true; const turnPhaseBefore = ctx.getTurnState().phase; ctx.turnActions.completeTurn(); if (convId) { @@ -106,7 +102,6 @@ export function handleMessageComplete( attachments: completedAttachments, }), ); - ctx.needsNewBubbleRef.current = true; const turnPhaseBefore = ctx.getTurnState().phase; ctx.turnActions.completeTurn(); const convId = ctx.streamContextRef.current?.conversationId; @@ -136,7 +131,6 @@ export function handleGenerationHandoff( rowMessageId: event.messageId, }), ); - ctx.needsNewBubbleRef.current = true; } export function handleGenerationCancelled( @@ -149,5 +143,4 @@ export function handleGenerationCancelled( ctx.clearProcessingKey(convId); } ctx.setMessages((prev) => stopStreaming(prev)); - ctx.needsNewBubbleRef.current = true; } diff --git a/apps/web/src/domains/chat/utils/stream-handlers/queue-handlers.test.ts b/apps/web/src/domains/chat/utils/stream-handlers/queue-handlers.test.ts index 18a5baa327..b5d9a35d4d 100644 --- a/apps/web/src/domains/chat/utils/stream-handlers/queue-handlers.test.ts +++ b/apps/web/src/domains/chat/utils/stream-handlers/queue-handlers.test.ts @@ -48,7 +48,7 @@ describe("handleMessageQueued", () => { }); describe("handleMessageDequeued", () => { - it("clears queue status and sets needsNewBubble", () => { + it("clears queue status when stableId mapping exists", () => { const ctx = makeCtx(); ctx.requestIdToStableIdRef.current.set("req-1", "stable-1"); handleMessageDequeued( @@ -58,7 +58,6 @@ describe("handleMessageDequeued", () => { expect(ctx.turnActions.dequeueMessage).toHaveBeenCalled(); expect(ctx.requestIdToStableIdRef.current.has("req-1")).toBe(false); expect(ctx.setMessages).toHaveBeenCalled(); - expect(ctx.needsNewBubbleRef.current).toBe(true); }); it("skips setMessages when no stableId mapping exists", () => { @@ -69,7 +68,6 @@ describe("handleMessageDequeued", () => { ); expect(ctx.turnActions.dequeueMessage).toHaveBeenCalled(); expect(ctx.setMessages).not.toHaveBeenCalled(); - expect(ctx.needsNewBubbleRef.current).toBe(true); }); }); diff --git a/apps/web/src/domains/chat/utils/stream-handlers/queue-handlers.ts b/apps/web/src/domains/chat/utils/stream-handlers/queue-handlers.ts index f5a239a90f..22d3fe0113 100644 --- a/apps/web/src/domains/chat/utils/stream-handlers/queue-handlers.ts +++ b/apps/web/src/domains/chat/utils/stream-handlers/queue-handlers.ts @@ -49,7 +49,6 @@ export function handleMessageDequeued( if (dequeuedStableId) { ctx.setMessages((prev) => clearQueueStatus(prev, dequeuedStableId)); } - ctx.needsNewBubbleRef.current = true; } export function handleMessageQueuedDeleted( diff --git a/apps/web/src/domains/chat/utils/stream-handlers/test-helpers.ts b/apps/web/src/domains/chat/utils/stream-handlers/test-helpers.ts index 9d82427902..ac0279580a 100644 --- a/apps/web/src/domains/chat/utils/stream-handlers/test-helpers.ts +++ b/apps/web/src/domains/chat/utils/stream-handlers/test-helpers.ts @@ -20,7 +20,6 @@ export function makeCtx( assistantIdRef: { current: "ast-1" }, setMessages: mock(() => {}), messagesRef: { current: [] }, - needsNewBubbleRef: { current: false }, turnActions: { requestSend: mock(() => {}), acceptSend: mock(() => {}), diff --git a/apps/web/src/domains/chat/utils/stream-handlers/tool-call-handlers.test.ts b/apps/web/src/domains/chat/utils/stream-handlers/tool-call-handlers.test.ts index 8f1134ab3b..99be21fc45 100644 --- a/apps/web/src/domains/chat/utils/stream-handlers/tool-call-handlers.test.ts +++ b/apps/web/src/domains/chat/utils/stream-handlers/tool-call-handlers.test.ts @@ -17,7 +17,6 @@ describe("handleToolUseStart", () => { expect(ctx.cancelReconciliation).toHaveBeenCalled(); expect(ctx.turnActions.onToolUseStart).toHaveBeenCalled(); expect(ctx.toolCallIdCounterRef.current).toBe(1); - expect(ctx.needsNewBubbleRef.current).toBe(false); expect(ctx.setMessages).toHaveBeenCalled(); }); @@ -35,13 +34,22 @@ describe("handleToolUseStart", () => { expect(ctx.toolCallIdCounterRef.current).toBe(0); }); - it("creates new bubble when needsNewBubbleRef is true", () => { - const ctx = makeCtx({ needsNewBubbleRef: { current: true } }); + it("creates a new bubble when the tail is not a streaming assistant", () => { + const ctx = makeCtx(); handleToolUseStart( - { type: "tool_use_start", toolName: "web_search", input: {} }, + { type: "tool_use_start", toolName: "web_search", input: {}, toolUseId: "tc-1" }, ctx, ); - expect(ctx.needsNewBubbleRef.current).toBe(false); + expect(ctx.setMessages).toHaveBeenCalled(); + const updater = (ctx.setMessages as unknown as ReturnType).mock.calls[0][0] as ( + prev: never[], + ) => Array<{ role: string; isStreaming: boolean; toolCalls: Array<{ id: string }> }>; + const next = updater([]); + expect(next).toHaveLength(1); + expect(next[0]?.role).toBe("assistant"); + expect(next[0]?.isStreaming).toBe(true); + expect(next[0]?.toolCalls).toHaveLength(1); + expect(next[0]?.toolCalls[0]?.id).toBe("tc-1"); }); }); diff --git a/apps/web/src/domains/chat/utils/stream-handlers/tool-call-handlers.ts b/apps/web/src/domains/chat/utils/stream-handlers/tool-call-handlers.ts index a743e65eb3..1129111653 100644 --- a/apps/web/src/domains/chat/utils/stream-handlers/tool-call-handlers.ts +++ b/apps/web/src/domains/chat/utils/stream-handlers/tool-call-handlers.ts @@ -1,4 +1,3 @@ -import { newStableId } from "@/domains/chat/utils/stable-id.js"; import { applyToolProgress, applyToolResult, @@ -27,16 +26,16 @@ export function handleToolUseStart( status: "running", startedAt: Date.now(), }; - const shouldCreateNewBubble = ctx.needsNewBubbleRef.current; - ctx.needsNewBubbleRef.current = false; - let stableId: string | undefined; - if (shouldCreateNewBubble) { - stableId = newStableId("assistant-tool"); - ctx.currentAssistantStableIdRef.current = stableId; - } - ctx.setMessages((prev) => - upsertToolCall(prev, newToolCall, shouldCreateNewBubble, stableId), - ); + ctx.setMessages((prev) => { + const next = upsertToolCall(prev, newToolCall); + const tail = next[next.length - 1]; + // Stamp the stable-id ref to the streaming tail (no-op for extends; new + // id for creates). See parallel logic in handleAssistantTextDelta. + if (tail?.role === "assistant" && tail.isStreaming) { + ctx.currentAssistantStableIdRef.current = tail.stableId; + } + return next; + }); } export function handleToolProgress( diff --git a/apps/web/src/domains/chat/utils/stream-handlers/types.ts b/apps/web/src/domains/chat/utils/stream-handlers/types.ts index adc538493a..f2e0b7cbf4 100644 --- a/apps/web/src/domains/chat/utils/stream-handlers/types.ts +++ b/apps/web/src/domains/chat/utils/stream-handlers/types.ts @@ -40,7 +40,6 @@ export interface StreamHandlerContext { // --- Messages --- setMessages: Dispatch>; messagesRef: MutableRefObject; - needsNewBubbleRef: MutableRefObject; // --- Turn state --- turnActions: TurnActions; diff --git a/apps/web/src/domains/conversations/use-conversation-loader.ts b/apps/web/src/domains/conversations/use-conversation-loader.ts index 9a123d87ed..c381b94000 100644 --- a/apps/web/src/domains/conversations/use-conversation-loader.ts +++ b/apps/web/src/domains/conversations/use-conversation-loader.ts @@ -83,7 +83,6 @@ interface UseConversationLoaderParams { activeConversationIdRef: MutableRefObject; contextWindowUsageByConversationRef: MutableRefObject>; dismissedSurfaceIdsRef: MutableRefObject>; - needsNewBubbleRef: MutableRefObject; streamingMessageIdsRef: MutableRefObject>; pendingQueuedStableIdsRef: MutableRefObject; requestIdToStableIdRef: MutableRefObject>; @@ -107,7 +106,6 @@ interface UseConversationLoaderParams { // Callbacks resetChatAttachments: () => void; - syncNeedsNewBubbleFromMessages: (nextMessages: DisplayMessage[]) => void; // Error classification shouldSuppressGenericChatErrorNotice: (prev: ChatError | null) => boolean; @@ -153,7 +151,6 @@ export function useConversationLoader({ activeConversationIdRef, contextWindowUsageByConversationRef, dismissedSurfaceIdsRef, - needsNewBubbleRef, streamingMessageIdsRef, pendingQueuedStableIdsRef, requestIdToStableIdRef, @@ -173,7 +170,6 @@ export function useConversationLoader({ setSuggestion, setCompactionCircuitOpenUntil, resetChatAttachments, - syncNeedsNewBubbleFromMessages, shouldSuppressGenericChatErrorNotice, }: UseConversationLoaderParams) { // ------------------------------------------------------------------------- @@ -456,7 +452,6 @@ export function useConversationLoader({ previousConversationIdRef, contextWindowUsageByConversationRef, dismissedSurfaceIdsRef, - needsNewBubbleRef, streamingMessageIdsRef, pendingQueuedStableIdsRef, requestIdToStableIdRef, @@ -473,7 +468,6 @@ export function useConversationLoader({ setSuggestion, setCompactionCircuitOpenUntil, resetChatAttachments, - syncNeedsNewBubbleFromMessages, shouldSuppressGenericChatErrorNotice, });