feat(agent-loop): partial-persist assistant content mid-turn (B6)#32602
Conversation
Closes the durability gap left by B3: a row reserved at
`llm_call_started` stayed empty for the full duration of a turn,
so a refresh mid-turn rendered an empty assistant bubble where the
in-progress reply should have been.
B6 mirrors streamed text + tool_use blocks into a per-row
accumulator on `EventHandlerState` and flushes via `updateContent`
on a first-fire-wins debounce:
• Time gate: 250ms since last flush (PARTIAL_PERSIST_DEBOUNCE_MS)
• Size gate: 1024 new bytes (PARTIAL_PERSIST_SIZE_THRESHOLD)
`handleTextDelta` appends + schedules; `handleToolUse` flushes
eagerly (tool_use blocks are atomic); `handleMessageComplete`
clears the pending timer before the final authoritative flush, so
a late debounce can't race the indexer/projector.
Indexer + attention projector still fire ONLY at message_complete.
Partial rows are never indexed — memory recall would otherwise
surface mid-turn fragments. Secrets are redacted from text blocks
on every flush, matching the discipline at message_complete.
Failure modes:
• Provider error: B3's orphan cleanup already deletes the row;
its partial content goes with the orphan (new test pins this).
• Retry path: B3's `assistantRowAwaitingFinalization` flag
triggers cleanup before the new reservation; `handleLlmCallStarted`
resets the accumulator for the fresh row.
• Stranded partial flush failure: logged and swallowed — the next
debounce tick or final flush at message_complete recovers.
Thinking blocks are intentionally NOT mirrored mid-turn. The final
`event.message.content` still carries them, so finalized rows
remain authoritative.
Spec: scratch/b6-partial-persist-spec.md
Tests: 7 new (debounce time gate, size gate, eager flush, timer
clear, no mid-turn indexing, redaction, provider-error cleanup).
All 71 tests in conversation-agent-loop.test.ts pass.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: f246d3fd8a
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| name: event.name, | ||
| input: event.input, | ||
| }); | ||
| void flushAccumulatedContent(state, deps); |
There was a problem hiding this comment.
Avoid flushing tool-use snapshots after finalization
In the current AgentLoop.run flow, tool_use events are emitted only after await onEvent({ type: "message_complete", ... }), so this eager partial flush runs after the authoritative final updateContent. For tool-calling assistant messages whose finalized content contains blocks not mirrored in accumulatedContentBlocks (for example thinking blocks, cleaned directive content, or UI surfaces), this later updateContent overwrites the finalized row with the partial accumulator snapshot and drops those blocks from persisted history.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
let's take a look at this
There was a problem hiding this comment.
Confirmed — fixed in 26b1cc6.
Verified the emit order: AgentLoop.run dispatches message_complete (loop.ts ~L1070) strictly before any tool_use (loop.ts ~L1080), so the partial flush from handleToolUse would always overwrite the finalized row and drop thinking blocks, cleaned-directive content, and UI surface blocks that aren't mirrored in accumulatedContentBlocks.
Deleted the eager flush in handleToolUse and replaced it with an inline comment explaining why. Added a new test (handleToolUse does NOT trigger a partial flush of its own) that fires a tool_use event with no prior text delta and asserts zero extra updateMessageContent calls.
There was a problem hiding this comment.
Same point Codex P1 raised — deleted in 26b1cc6.
AgentLoop.run emits tool_use after message_complete (loop.ts L1070 vs L1080), so any flush dispatched from handleToolUse would always overwrite the finalized row and drop thinking blocks, cleaned-directive content, and UI surface blocks that aren't mirrored in accumulatedContentBlocks.
Eager push + void flushAccumulatedContent removed, replaced with an inline comment explaining the emit-order constraint. Added test handleToolUse does NOT trigger a partial flush of its own to lock it in.
| deps: EventHandlerDeps, | ||
| ): void { | ||
| if (state.accumulatedCharsSinceLastFlush >= PARTIAL_PERSIST_SIZE_THRESHOLD) { | ||
| void flushAccumulatedContent(state, deps); |
There was a problem hiding this comment.
Track in-flight partial writes before final update
When the size gate fires, the partial updateContent is started fire-and-forget, so handleMessageComplete can clear only timers, not a write already awaiting the persistence pipeline. If a large text delta triggers this path right before message_complete and the partial write completes after the final write, it can overwrite the authoritative finalized content with the accumulator snapshot, losing finalized-only blocks or cleanup.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Addressed defensively in 26b1cc6.
EventHandlerState now tracks an in-flight pendingPartialFlushPromise for the partial pipeline call. handleMessageComplete awaits it before issuing the authoritative final write, so a partial flush dispatched moments before finalize cannot land after the final update.
I tried to add a regression test for this and pulled it before pushing. defaultPersistenceTerminal (assistant/src/plugins/defaults/persistence.ts:92) calls updateMessageContent(args.messageId, args.content) without await — Bun's SQLite binding is synchronous in prod, so the terminal cannot exercise an async race today. The race window opens only once an async persistence middleware is registered, which is exactly when the in-flight await earns its keep. Happy to wire up a fake async plugin if you'd like the regression coverage anyway.
Two TS errors in conversation-agent-loop.test.ts:
1. tool_result event used snake_case (`tool_use_id`, `is_error`)
instead of the AgentEvent shape's camelCase (`toolUseId`, `isError`).
The snake_case shape is the wire/block form, not the in-process event.
2. `partialBlocks` type didn't allow `input` on tool_use blocks, so
`expect(partialBlocks).toEqual([... { input: { path: '/foo' } }])`
failed type-check. Widened with `input?: Record<string, unknown>`.
|
Type Check failure was deterministic:
Both fixed in 196a2d8. Validated locally — affected test ( Test failure on
Letting CI re-run; flagging as a separate test-isolation cleanup if it sticks. |
| /** Original input from server_tool_start, keyed by tool_use_id, so the complete handler can read the query. */ | ||
| readonly serverToolInputs: Map<string, Record<string, unknown>>; | ||
| /** | ||
| * B6 partial-persistence accumulator. Mirrors the in-progress |
There was a problem hiding this comment.
Don't reference PR numbers in comments
There was a problem hiding this comment.
| * Returns the new total of unflushed bytes so the caller can drive the | ||
| * size gate. | ||
| */ | ||
| function appendTextToAccumulator( |
There was a problem hiding this comment.
I don't like that we are redefining the accumulation logic - Ideally we just reuse whichever want the message complete event is using
There was a problem hiding this comment.
Done in 26b1cc6. Extracted buildPersistedAssistantContent(rawBlocks, surfaces) which runs cleanAssistantContent → appends UI-surface blocks → redactSecrets over text. Both handleMessageComplete and flushAccumulatedContent now go through it, so the partial and final writes use the same content-build pipeline. Partial flushes pass surfaces=[] (surfaces aren't known mid-turn); final flush passes deps.ctx.currentTurnSurfaces.
The directive + warning side-effects from cleanAssistantContent are still invoked once from handleMessageComplete (they shouldn't fire on each partial flush).
There was a problem hiding this comment.
Round 2 in 6b97004.
Moved the running content view from EventHandlerState.accumulatedContentBlocks to AgentLoopConversationContext.currentMessageContent. It now lives at the turn level alongside currentTurnSurfaces, so it's the single source of truth for in-flight message content. The handler writes on text_delta and reads at flush time; handleMessageComplete resets it after the authoritative event.message.content write.
That fixes the "parallel state in the handler" smell. The truer reuse (have the agent loop emit a partial_message_complete carrying a Message-shaped snapshot built by the same path that produces event.message.content) would require provider-level work — the Anthropic SDK only exposes its internal assembly at stream.finalMessage() end-of-stream, so the loop would need to maintain its own mirror. Happy to do that next PR if you want the loop to fully own the running view.
| * forces an immediate flush regardless of the debounce timer; reset | ||
| * to 0 after every flush. | ||
| */ | ||
| accumulatedCharsSinceLastFlush: number; |
There was a problem hiding this comment.
let's just use timer's for now
There was a problem hiding this comment.
Done in 26b1cc6. Dropped PARTIAL_PERSIST_SIZE_THRESHOLD, accumulatedCharsSinceLastFlush, and the scheduleOrFireFlush size-gate branch. Only the 250ms debounce timer remains. Renamed the entry point to schedulePartialFlush for clarity. Tests updated to drive the flush by waiting on the timer instead of accumulating 1500-char chunks.
| name: event.name, | ||
| input: event.input, | ||
| }); | ||
| void flushAccumulatedContent(state, deps); |
There was a problem hiding this comment.
let's take a look at this
…ld pipeline Address review feedback on #32602: - Drop size-gate; debounce timer only (Vargas) - Drop eager flush in handleToolUse — AgentLoop emits tool_use AFTER message_complete, so any flush there would overwrite the finalized row (Codex P1 / Vargas line 872) - Extract buildPersistedAssistantContent shared between the partial flush and handleMessageComplete so both writes use the same cleanAssistantContent + surfaces + redactSecrets pipeline (Vargas line 378) - Track in-flight partial flush promise on EventHandlerState; await it at message_complete before the authoritative final write so a partial pipeline call dispatched moments before finalize cannot settle after it (Codex P2) - Scrub B6 / PR-number references from comments (Vargas line 266) - Tests: drop size-gate test, replace eager-flush test with 'handleToolUse does NOT flush' assertion, rewrite indexer/redact/ provider-error tests to use the 250ms debounce instead of size-gate
| // Indexer + projector still fire ONLY at `handleMessageComplete`; the | ||
| // row's `content` may temporarily reflect a partial assistant turn but | ||
| // is never indexed mid-stream. | ||
| const PARTIAL_PERSIST_DEBOUNCE_MS = 250; |
There was a problem hiding this comment.
I think we can bump this up to 1s
There was a problem hiding this comment.
Done in 6b97004. PARTIAL_PERSIST_DEBOUNCE_MS is now 1000. Test waits bumped to 1100ms.
|
|
||
| const log = getLogger("agent-loop-handlers"); | ||
|
|
||
| // ── Partial-persistence tunables ───────────────────────────────────── |
There was a problem hiding this comment.
make all these doc strings just 1 liners
There was a problem hiding this comment.
Done in 6b97004. Compressed the tunable block, EventHandlerState fields (pendingPartialFlushTimer + pendingPartialFlushPromise), buildPersistedAssistantContent, appendTextToCurrentMessage, resetPartialPersistAccumulator, flushAccumulatedContent, and schedulePartialFlush to one-liners.
| type AssistantSurface = | ||
| AgentLoopConversationContext["currentTurnSurfaces"] extends ReadonlyArray< | ||
| infer S | ||
| > | ||
| ? S | ||
| : never; |
There was a problem hiding this comment.
it's easier to just define AssistantSurface and import it than to extract it like this
There was a problem hiding this comment.
Done in 6b97004. Added export interface AssistantSurface { ... } in conversation-agent-loop.ts, retyped AgentLoopConversationContext.currentTurnSurfaces and Conversation.currentTurnSurfaces as AssistantSurface[], and the handler now imports the named type directly instead of infer-extracting.
| // No partial-persist flush from here: `AgentLoop.run` emits | ||
| // `tool_use` strictly AFTER `message_complete`, so any flush from | ||
| // this handler would land after the authoritative final | ||
| // `updateContent` and overwrite the finalized row. The tool_use | ||
| // block lands in the finalized content via `event.message.content` | ||
| // at `handleMessageComplete`. |
…ntext Address review feedback round 2 on #32602: - Move the partial-persist accumulator from EventHandlerState to AgentLoopConversationContext.currentMessageContent. The running view now lives at the turn level alongside currentTurnSurfaces, making it the single source of truth for in-flight message content. Handler becomes a writer; flushAccumulatedContent reads from ctx. - Export AssistantSurface as a named interface from conversation-agent-loop.ts so callers can import it directly instead of infer-extracting from the context type. currentTurnSurfaces is now typed as AssistantSurface[]. - Bump partial-persist debounce from 250ms to 1000ms. - Compress doc-strings to one-liners on the tunable block, EventHandlerState fields, helpers, and the build pipeline. - Delete the explanatory comment in handleToolUse (deleted handler body is self-explanatory after the eager flush removal). Test mocks updated; debounce-wait increased to 1100ms. 70/70 partial-persist tests pass; typecheck clean. All 20 tests that touch currentTurnSurfaces pass individually.
| * {@link AgentLoopConversationContext.currentMessageContent}). | ||
| * @internal | ||
| */ | ||
| currentMessageContent: ContentBlock[] = []; |
There was a problem hiding this comment.
Okay, this field looks redundant with the one in the agent loop, so let's consolidate for now.
There was a problem hiding this comment.
Done in 0a52601. Dropped both the Conversation.currentMessageContent field and the AgentLoopConversationContext.currentMessageContent field — consolidated onto EventHandlerState.currentMessageContent.
Reasoning: the running mirror has no readers outside the agent loop's per-turn lifecycle (unlike currentTurnSurfaces, which surface-content-routes.ts reads off Conversation). Putting it on the long-lived context forced a redundant field on Conversation for structural-typing parity with no payoff. EventHandlerState is where the rest of the per-turn agent-loop state lives (pendingPartialFlushTimer, toolUseIdToName, etc), so it's the natural home.
Helper signatures simplified: appendTextToCurrentMessage(state, text), resetPartialPersistAccumulator(state), flushAccumulatedContent(state, deps) reads state.currentMessageContent. Test mock dropped the now-unused ctx field; createEventHandlerState() factory initializes the new field so all existing handler-state tests get it for free.
Verified: 70/70 agent-loop tests pass, dependent test files (annotate-risk-options, tool-preview-lifecycle, outbound-slack-persistence, tool-result-metadata-plumbing, persistence-secret-redaction, max-tokens, disk-pressure, overflow, inference-profile) all green per-file, typecheck clean.
Consolidates currentMessageContent to a single home — per-turn EventHandlerState — eliminating the redundant field on Conversation (only needed for structural compatibility with AgentLoopConversationContext). The running mirror is per-turn state with no external readers, so it belongs alongside other per-turn agent-loop state (pendingPartialFlushTimer, toolUseIdToName, etc) rather than on the long-lived context. Per PR #32602 review feedback.
What
Closes the durability gap left by B3 (#32326): a row reserved at
llm_call_startedstayed empty for the full duration of a turn, so a refresh mid-turn rendered an empty assistant bubble where the in-progress reply should have been.B6 mirrors streamed text + tool_use blocks into a per-row accumulator on
EventHandlerStateand flushes viaupdateContenton a first-fire-wins debounce.How
Two gates, first-fire wins:
PARTIAL_PERSIST_DEBOUNCE_MS = 250— at most ~4 writes/sec on a steady stream, so a refresh never lands more than ~250ms behind the wire.PARTIAL_PERSIST_SIZE_THRESHOLD = 1024— fast bursts that wouldn't trip the time gate still flush.Call sites:
handleLlmCallStartedhandleTextDeltadrained.emitText(not rawevent.text, so directive markup never persists), then scheduleshandleToolUse{type: "tool_use", id, name, input}block and flushes eagerly — tool blocks are atomichandleMessageCompleteThe helpers (
appendTextToAccumulator,resetPartialPersistAccumulator,flushAccumulatedContent,scheduleOrFireFlush) live alongside the existing handlers.What does NOT change
message_complete. Indexing partial content would surface mid-turn fragments in memory recall. Pinned by a new test.event.message.contentstill carries them, so finalized rows remain authoritative.handleMessageComplete's discipline.Failure modes
assistantRowAwaitingFinalizationflag triggers cleanup before the new reservation.handleLlmCallStartedresets the accumulator for the fresh row.message_completerecovers.handleMessageCompletebefore the finalupdateContentruns.Tests
7 new tests in a
describe("B6 partial persistence", …)block:handleToolUseflushes eagerly (no debounce).handleMessageCompleteclears any pending debounce timer.All 71 tests in
conversation-agent-loop.test.tspass (64 B3 baseline + 7 new B6).Acceptance
bun test src/__tests__/conversation-agent-loop.test.tsgreen; typecheck + lint + prettier clean.Follow-up
B7 (SSE seq + Last-Event-ID replay) closes the remaining "deltas between last flush and refresh" seam on the wire. B6 narrows the gap from "everything before now" to "the last 250ms window"; B7 will close it to zero.