Skip to content

streaming: server-authoritative message identity + idempotent client reducer#32466

Closed
TirmanSidhu wants to merge 6 commits into
mainfrom
TirmanSidhu/streaming-message-architecture
Closed

streaming: server-authoritative message identity + idempotent client reducer#32466
TirmanSidhu wants to merge 6 commits into
mainfrom
TirmanSidhu/streaming-message-architecture

Conversation

@TirmanSidhu
Copy link
Copy Markdown
Contributor

Summary

Re-architects the daemon→macOS streaming pipeline so the same-message-rendered-twice mid-stream bug is structurally unrepresentable instead of chased through ad-hoc dedupe.

Before: daemon emitted anonymous assistant_text_delta events; client used a lazy "find the bubble for currentAssistantMessageId, else create a new one" path. Three failure modes (double subscriber window during loop restart, stale-id forking after handoff, reconnect replay) all routed through that "else create new" branch.

After:

  • Daemon assigns messageId (UUIDv7) at message open, not at persist. Every event carries (messageId, blockIndex, seq). Events are persisted to a durable SQLite log; SSE supports Last-Event-Id replay on reconnect.
  • Client runs a single MessageStreamReducer keyed by messageId, idempotent via per-(messageId, blockIndex) seq watermarks. Renderer reads from MessageStore.
  • EventStreamClient fan-out collapsed into 11 typed per-domain dispatchers with bounded buffers. startMessageLoop/messageLoopTask/messageLoopGeneration removed.

PRs merged into this branch

Where the fix actually lands

PR #32454. The renderer now reads from MessageStore; renderedMessages drops legacy lazy-created bubbles that lack a daemonMessageId. The MessageStore snapshot is authoritative.

Scope notes

  • PR streaming: retire legacy vocabulary #32462 narrowed: it removed the TranscriptProjector dedupe-by-id and updated wire-contract docs, but did NOT delete currentAssistantMessageId, streamingDeltaBuffer, partialOutputBuffer, clearCurrentTurnTracking, appendTextToCurrentMessage, or the legacy messages array. The agent judged those entanglements (~160 call sites; load-bearing semantics beyond text streaming — tool result anchoring, surface widget routing, send-coordination, watchdog recovery) too risky for a single PR. Daemon-side message_complete emission was also kept because CLI/voice/Slack consumers still need it.
  • The structural fix is in place because the renderer no longer sources from those legacy helpers — they are dead-write paths from the view's perspective. Removing them is straightforward follow-up.

Test plan

  • Send several messages in quick succession in a fresh conversation, especially the 2nd message right after the 1st — the previously-reproducible duplicate-bubble-mid-stream should be gone.
  • Reload an in-flight conversation mid-stream — message identity should be preserved (no re-creation as new bubbles).
  • Reconnect the SSE stream (kill daemon, restart) mid-turn — replay should be exact, no duplicates.
  • History reload of a long conversation should render identically to streaming.
  • Multi-step skill turns with generation_handoff — second turn's text should not fork into a new bubble.

🤖 Generated with Claude Code

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 36a80a28f1

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

subscriptionTask?.cancel()
subscriptionTask = Task { [weak self] in
guard let self else { return }
let stream = self.eventStreamClient.subscribeChatEvents()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Filter reducer events to the active conversation

Because subscribeChatEvents() now delivers the global chat category, this reducer applies assistant lifecycle/delta events from every conversation without the belongsToConversation guard that the legacy ChatActionHandler still uses. When another conversation streams (for example a pop-out window, background task, or another active VM), its message_open/delta events are inserted into this VM's messageStore, and renderedMessages will append those snapshots into the wrong transcript.

Useful? React with 👍 / 👎.

Comment on lines +169 to +171
let (stream, continuation) = AsyncStream<ServerMessage>.makeStream(
bufferingPolicy: .bufferingNewest(eventStreamSubscriberBufferLimit)
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Do not drop buffered streaming deltas

Using .bufferingNewest(256) silently discards older chat events whenever the main-actor consumer falls behind a long or fast assistant/tool stream. Chat messages use incremental assistant_text_delta and tool_input_delta payloads rather than snapshots, so dropping an older buffered event permanently removes part of the transcript (and no reconnect/replay is triggered because the SSE connection itself stayed healthy).

Useful? React with 👍 / 👎.

@TirmanSidhu TirmanSidhu deleted the TirmanSidhu/streaming-message-architecture branch May 28, 2026 20:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant