From 88ea62e5f14b7f814fc9a313cce0661f279186db Mon Sep 17 00:00:00 2001 From: tirmansidhu Date: Thu, 28 May 2026 19:51:04 +0100 Subject: [PATCH] streaming: add MessageStreamReducer and MessageStore (unused by UI) --- .../shared/Features/Chat/ChatViewModel.swift | 20 ++ .../shared/Features/Chat/MessageStore.swift | 165 +++++++++++++ .../Features/Chat/MessageStreamReducer.swift | 228 ++++++++++++++++++ .../Network/Generated/GeneratedAPITypes.swift | 150 +++++++++++- clients/shared/Network/MessageTypes.swift | 36 +++ 5 files changed, 595 insertions(+), 4 deletions(-) create mode 100644 clients/shared/Features/Chat/MessageStore.swift create mode 100644 clients/shared/Features/Chat/MessageStreamReducer.swift diff --git a/clients/shared/Features/Chat/ChatViewModel.swift b/clients/shared/Features/Chat/ChatViewModel.swift index 117c93783bb..ae543146e7a 100644 --- a/clients/shared/Features/Chat/ChatViewModel.swift +++ b/clients/shared/Features/Chat/ChatViewModel.swift @@ -708,6 +708,14 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate { static let maxImageSize = ChatAttachmentManager.maxImageSize public let subagentDetailStore = SubagentDetailStore() + /// New transcript representation populated by `MessageStreamReducer` from + /// the daemon's PR-1 streaming events (`message_open` / `block_open` / + /// `block_close` / `message_close`). Currently **unused by any view** — + /// the on-screen transcript is still driven by the legacy `messages` + /// array. PR 4 of the streaming-message-architecture plan flips renderers + /// to read from this store. + public let messageStore = MessageStore() + @ObservationIgnored let messageStreamReducer: MessageStreamReducer let connectionManager: GatewayConnectionManager let eventStreamClient: EventStreamClient private let settingsClient: any SettingsClientProtocol @@ -1286,6 +1294,13 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate { self.interactionClient = interactionClient self.conversationQueueClient = conversationQueueClient self.onToolCallsComplete = onToolCallsComplete + // Initialize the new streaming-architecture reducer. The data path is + // wired so events accumulate into `messageStore`, but no view reads + // from the store yet (see the property doc comment for context). + self.messageStreamReducer = MessageStreamReducer( + store: messageStore, + eventStreamClient: eventStreamClient + ) self.paginationState = ChatPaginationState( messageManager: messageManager ) @@ -1313,6 +1328,11 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate { // Initialize the action handler for server message dispatch. self.actionHandler = ChatActionHandler(viewModel: self) + // Start consuming streaming events into the new MessageStore. Output + // is not rendered by any view in this PR — see `messageStore` doc + // comment for the rollout plan. + self.messageStreamReducer.start() + // Surface attachment validation errors in the error manager so the UI // can show them without the attachment manager needing a direct reference. attachmentManager.onError = { [weak self] message in diff --git a/clients/shared/Features/Chat/MessageStore.swift b/clients/shared/Features/Chat/MessageStore.swift new file mode 100644 index 00000000000..0926eacc77d --- /dev/null +++ b/clients/shared/Features/Chat/MessageStore.swift @@ -0,0 +1,165 @@ +import Foundation + +/// In-memory snapshot of a single content block within an assistant message. +/// +/// A block is either a streamed text block (`type == .text`) or a tool +/// invocation block (`type == .toolUse`). Mirrors the daemon's wire-protocol +/// block model so the reducer can populate fields verbatim from incoming +/// `block_open`, `assistant_text_delta`, `tool_use_start`, `tool_input_delta`, +/// `tool_result`, and `block_close` events. +/// +/// Idempotency invariant: every mutation in `MessageStreamReducer` is gated on +/// the `seq` watermark stored in `MessageSnapshot`, so re-applying a delivered +/// event is a no-op. +public struct BlockSnapshot: Sendable { + public enum Kind: Sendable, Equatable { + case text + case toolUse + } + + public var type: Kind + /// Accumulated text content for `.text` blocks. Empty for `.toolUse` blocks. + public var text: String + /// Tool name for `.toolUse` blocks. `nil` for `.text` blocks. + public var toolName: String? + /// Stable tool-use id from the agent (correlates with confirmations and results). + public var toolUseId: String? + /// Accumulated raw input JSON streamed via `tool_input_delta`. Note this is + /// the streamed partial JSON shape — final structured input is delivered by + /// `tool_use_start` and stored in `toolInput`. + public var toolInputJson: String + /// Structured input record produced by the model (populated from `tool_use_start`). + public var toolInput: [String: AnyCodable]? + /// Final tool result payload (populated from `tool_result`). + public var toolResult: ToolResultMessage? + /// True after a matching `block_close` event has been observed. + public var isComplete: Bool + + public init( + type: Kind, + text: String = "", + toolName: String? = nil, + toolUseId: String? = nil, + toolInputJson: String = "", + toolInput: [String: AnyCodable]? = nil, + toolResult: ToolResultMessage? = nil, + isComplete: Bool = false + ) { + self.type = type + self.text = text + self.toolName = toolName + self.toolUseId = toolUseId + self.toolInputJson = toolInputJson + self.toolInput = toolInput + self.toolResult = toolResult + self.isComplete = isComplete + } +} + +/// In-memory snapshot of a single assistant message, indexed by `messageId`. +/// +/// Holds the message-level metadata declared by `message_open` plus the array +/// of content blocks declared by `block_open` / closed by `block_close`. The +/// `seqWatermarks` table tracks the highest `seq` applied per block (with +/// `-1` reserved as the message-level watermark) so the reducer can ignore +/// out-of-order or duplicate events. +public struct MessageSnapshot: Sendable { + /// Stable assistant-message id declared by `message_open`. + public let id: String + /// "assistant". + public var role: String + /// Blocks indexed by `blockIndex`. Sparse during streaming; entries are + /// created on first `block_open` for a given index. + public var blocks: [Int: BlockSnapshot] + /// True after a matching `message_close` event has been observed. + public var isComplete: Bool + /// Highest `seq` applied per `(messageId, blockIndex)`. The sentinel + /// `blockIndex == -1` records message-level events (`message_open`, + /// `message_close`). New events with `seq <= watermark` are dropped. + public var seqWatermarks: [Int: Int] + + public init( + id: String, + role: String = "assistant", + blocks: [Int: BlockSnapshot] = [:], + isComplete: Bool = false, + seqWatermarks: [Int: Int] = [:] + ) { + self.id = id + self.role = role + self.blocks = blocks + self.isComplete = isComplete + self.seqWatermarks = seqWatermarks + } + + /// Ordered view of the blocks for rendering (sorted by `blockIndex`). + public var orderedBlocks: [(index: Int, block: BlockSnapshot)] { + blocks.keys.sorted().map { ($0, blocks[$0]!) } + } +} + +/// Reactive store of `MessageSnapshot` values keyed by `messageId`. +/// +/// Designed to be the new source of truth for the chat transcript, replacing +/// the legacy `messages` array on `ChatViewModel`. As of PR 3, the store is +/// populated by `MessageStreamReducer` from the SSE event stream but is **not +/// rendered by any view** — UI continues to read from the legacy state. PR 4 +/// in the streaming-message-architecture plan flips renderers to consume this +/// store. +/// +/// Mutations happen exclusively on the main actor (so SwiftUI observers see +/// consistent snapshots), matching the existing store pattern in this +/// directory (see `ContactsStore`, `DirectoryStore`, `SettingsStore`). +@MainActor @Observable +public final class MessageStore { + + /// Snapshots keyed by `messageId`. + public var messages: [String: MessageSnapshot] = [:] + + /// Insertion order of `messageId` for stable rendering. Populated when a + /// message is first inserted via `upsertMessage`. + public var messageOrder: [String] = [] + + public init() {} + + // MARK: - Convenience accessors + + public func message(id: String) -> MessageSnapshot? { + messages[id] + } + + /// Ordered messages for rendering. Insertion order is preserved across + /// updates so streaming bubbles don't reshuffle as new blocks arrive. + public var orderedMessages: [MessageSnapshot] { + messageOrder.compactMap { messages[$0] } + } + + // MARK: - Mutation helpers + // + // Called by `MessageStreamReducer`. Public so tests can exercise the store + // directly, but production callers should always route through the reducer + // to preserve the seq-watermark idempotency invariant. + + /// Insert an empty message snapshot if one does not already exist. + /// Returns `true` if a new snapshot was inserted, `false` if the id was + /// already present (idempotent re-application). + @discardableResult + public func upsertMessage(id: String, role: String) -> Bool { + if messages[id] != nil { return false } + messages[id] = MessageSnapshot(id: id, role: role) + messageOrder.append(id) + return true + } + + public func updateMessage(id: String, mutate: (inout MessageSnapshot) -> Void) { + guard var snapshot = messages[id] else { return } + mutate(&snapshot) + messages[id] = snapshot + } + + /// Reset the store. Used when switching conversations or on logout. + public func reset() { + messages.removeAll() + messageOrder.removeAll() + } +} diff --git a/clients/shared/Features/Chat/MessageStreamReducer.swift b/clients/shared/Features/Chat/MessageStreamReducer.swift new file mode 100644 index 00000000000..2ce88899401 --- /dev/null +++ b/clients/shared/Features/Chat/MessageStreamReducer.swift @@ -0,0 +1,228 @@ +import Foundation + +/// Applies streaming SSE events to a `MessageStore`, producing the new +/// transcript representation introduced by the streaming-message-architecture +/// plan (see `.private/plans/streaming-message-architecture.md`). +/// +/// The reducer is **idempotent**: re-delivering an event that has already been +/// applied is a no-op. Idempotency is enforced by tracking the highest `seq` +/// applied per `(messageId, blockIndex)` — events with a `seq` less than or +/// equal to the watermark are silently dropped. Message-level events +/// (`message_open`, `message_close`) use the sentinel block index `-1`. +/// +/// As of PR 3, the reducer consumes the new event types from `EventStreamClient` +/// but **the resulting `MessageStore` is unused by any view**. The legacy +/// streaming path on `ChatViewModel` still drives the on-screen transcript. +/// PR 4 flips renderers to read from the store. +/// +/// Lifecycle: an instance is owned by `ChatViewModel` (one per chat scope). +/// Call `start()` after construction to begin consuming events; `stop()` to +/// tear down the subscription (or rely on `deinit`, which cancels the task). +@MainActor +public final class MessageStreamReducer { + /// Sentinel `blockIndex` used to record message-level events in the + /// per-message watermark table (`message_open`, `message_close`). + private static let messageLevelBlockIndex = -1 + + /// The store this reducer mutates. Owned by the caller. + public let store: MessageStore + + private let eventStreamClient: EventStreamClient + private var subscriptionTask: Task? + + public init(store: MessageStore, eventStreamClient: EventStreamClient) { + self.store = store + self.eventStreamClient = eventStreamClient + } + + deinit { + subscriptionTask?.cancel() + } + + /// Subscribe to the `EventStreamClient` and feed every message into + /// `apply(event:)`. Safe to call multiple times — re-subscribes after + /// cancelling the previous task. + public func start() { + subscriptionTask?.cancel() + subscriptionTask = Task { [weak self] in + guard let self else { return } + let stream = self.eventStreamClient.subscribe() + for await message in stream { + if Task.isCancelled { return } + self.apply(event: message) + } + } + } + + /// Cancel the active subscription. The store is left as-is. + public func stop() { + subscriptionTask?.cancel() + subscriptionTask = nil + } + + // MARK: - Apply + + /// Apply a single `ServerMessage` to the underlying `MessageStore`. + /// Events that are not part of the new streaming protocol are ignored. + public func apply(event: ServerMessage) { + switch event { + case .messageOpen(let msg): + applyMessageOpen(msg) + case .blockOpen(let msg): + applyBlockOpen(msg) + case .blockClose(let msg): + applyBlockClose(msg) + case .messageClose(let msg): + applyMessageClose(msg) + case .assistantTextDelta(let msg): + applyTextDelta(msg) + case .toolUseStart(let msg): + applyToolUseStart(msg) + case .toolInputDelta(let msg): + applyToolInputDelta(msg) + case .toolResult(let msg): + applyToolResult(msg) + default: + // All other events are out of scope for the new streaming + // architecture and intentionally ignored. + break + } + } + + // MARK: - Event handlers + + private func applyMessageOpen(_ msg: MessageOpenMessage) { + guard shouldApply(messageId: msg.messageId, blockIndex: Self.messageLevelBlockIndex, seq: msg.seq) else { return } + store.upsertMessage(id: msg.messageId, role: msg.role) + recordSeq(messageId: msg.messageId, blockIndex: Self.messageLevelBlockIndex, seq: msg.seq) + } + + private func applyBlockOpen(_ msg: BlockOpenMessage) { + guard shouldApply(messageId: msg.messageId, blockIndex: msg.blockIndex, seq: msg.seq) else { return } + // Ensure the parent message exists. If `message_open` arrived first + // (the canonical ordering) this is a no-op; otherwise we synthesize a + // bare snapshot so the block has somewhere to live. + store.upsertMessage(id: msg.messageId, role: "assistant") + let kind: BlockSnapshot.Kind = (msg.blockType == "tool_use") ? .toolUse : .text + store.updateMessage(id: msg.messageId) { snapshot in + if snapshot.blocks[msg.blockIndex] == nil { + snapshot.blocks[msg.blockIndex] = BlockSnapshot( + type: kind, + toolName: msg.toolName, + toolUseId: msg.toolUseId + ) + } + } + recordSeq(messageId: msg.messageId, blockIndex: msg.blockIndex, seq: msg.seq) + } + + private func applyBlockClose(_ msg: BlockCloseMessage) { + guard shouldApply(messageId: msg.messageId, blockIndex: msg.blockIndex, seq: msg.seq) else { return } + store.updateMessage(id: msg.messageId) { snapshot in + snapshot.blocks[msg.blockIndex]?.isComplete = true + } + recordSeq(messageId: msg.messageId, blockIndex: msg.blockIndex, seq: msg.seq) + } + + private func applyMessageClose(_ msg: MessageCloseMessage) { + guard shouldApply(messageId: msg.messageId, blockIndex: Self.messageLevelBlockIndex, seq: msg.seq) else { return } + store.updateMessage(id: msg.messageId) { snapshot in + snapshot.isComplete = true + } + recordSeq(messageId: msg.messageId, blockIndex: Self.messageLevelBlockIndex, seq: msg.seq) + } + + private func applyTextDelta(_ msg: AssistantTextDeltaMessage) { + guard let messageId = msg.messageId, let blockIndex = msg.blockIndex else { + // Synthetic / pre-anchor deltas are scoped to the legacy renderer. + return + } + guard shouldApply(messageId: messageId, blockIndex: blockIndex, seq: msg.seq) else { return } + // The block may not have been opened yet if a text delta beats its + // `block_open` event under network reordering. Lazily materialize a + // text block so the chunk isn't dropped. + store.upsertMessage(id: messageId, role: "assistant") + store.updateMessage(id: messageId) { snapshot in + if snapshot.blocks[blockIndex] == nil { + snapshot.blocks[blockIndex] = BlockSnapshot(type: .text) + } + snapshot.blocks[blockIndex]?.text.append(msg.text) + } + if let seq = msg.seq { + recordSeq(messageId: messageId, blockIndex: blockIndex, seq: seq) + } + } + + private func applyToolUseStart(_ msg: ToolUseStartMessage) { + guard let messageId = msg.messageId, let blockIndex = msg.blockIndex else { return } + guard shouldApply(messageId: messageId, blockIndex: blockIndex, seq: msg.seq) else { return } + store.upsertMessage(id: messageId, role: "assistant") + store.updateMessage(id: messageId) { snapshot in + if snapshot.blocks[blockIndex] == nil { + snapshot.blocks[blockIndex] = BlockSnapshot(type: .toolUse) + } + snapshot.blocks[blockIndex]?.type = .toolUse + snapshot.blocks[blockIndex]?.toolName = msg.toolName + snapshot.blocks[blockIndex]?.toolUseId = msg.toolUseId + snapshot.blocks[blockIndex]?.toolInput = msg.input + } + if let seq = msg.seq { + recordSeq(messageId: messageId, blockIndex: blockIndex, seq: seq) + } + } + + private func applyToolInputDelta(_ msg: ToolInputDeltaMessage) { + guard let messageId = msg.messageId, let blockIndex = msg.blockIndex else { return } + guard shouldApply(messageId: messageId, blockIndex: blockIndex, seq: msg.seq) else { return } + store.upsertMessage(id: messageId, role: "assistant") + store.updateMessage(id: messageId) { snapshot in + if snapshot.blocks[blockIndex] == nil { + snapshot.blocks[blockIndex] = BlockSnapshot(type: .toolUse, toolName: msg.toolName, toolUseId: msg.toolUseId) + } + snapshot.blocks[blockIndex]?.toolInputJson.append(msg.content) + } + if let seq = msg.seq { + recordSeq(messageId: messageId, blockIndex: blockIndex, seq: seq) + } + } + + private func applyToolResult(_ msg: ToolResultMessage) { + guard let messageId = msg.messageId, let blockIndex = msg.blockIndex else { return } + guard shouldApply(messageId: messageId, blockIndex: blockIndex, seq: msg.seq) else { return } + store.upsertMessage(id: messageId, role: "assistant") + store.updateMessage(id: messageId) { snapshot in + if snapshot.blocks[blockIndex] == nil { + snapshot.blocks[blockIndex] = BlockSnapshot(type: .toolUse, toolName: msg.toolName, toolUseId: msg.toolUseId) + } + snapshot.blocks[blockIndex]?.toolResult = msg + } + if let seq = msg.seq { + recordSeq(messageId: messageId, blockIndex: blockIndex, seq: seq) + } + } + + // MARK: - Idempotency + + /// Returns `true` when the event should be applied. The contract: + /// - If `seq` is `nil` (legacy daemon pre-PR-1), apply unconditionally — + /// we cannot deduplicate without a sequence number. The legacy renderer + /// already tolerates duplicates. + /// - Otherwise, drop events whose `seq` is `<=` the recorded watermark. + private func shouldApply(messageId: String, blockIndex: Int, seq: Int?) -> Bool { + guard let seq else { return true } + if let watermark = store.messages[messageId]?.seqWatermarks[blockIndex], + seq <= watermark { + return false + } + return true + } + + private func recordSeq(messageId: String, blockIndex: Int, seq: Int) { + store.updateMessage(id: messageId) { snapshot in + if let existing = snapshot.seqWatermarks[blockIndex], existing >= seq { + return + } + snapshot.seqWatermarks[blockIndex] = seq + } + } +} diff --git a/clients/shared/Network/Generated/GeneratedAPITypes.swift b/clients/shared/Network/Generated/GeneratedAPITypes.swift index 597311c8d05..5acd57d8123 100644 --- a/clients/shared/Network/Generated/GeneratedAPITypes.swift +++ b/clients/shared/Network/Generated/GeneratedAPITypes.swift @@ -460,11 +460,26 @@ public struct AssistantTextDelta: Codable, Sendable { public let type: String public let text: String public let conversationId: String? + /// Database row id of the assistant message this delta belongs to. Stamped + /// from the pre-allocated turn anchor (see `AssistantTurnStartEvent`). + /// Absent on streams produced by older daemons that pre-date the anchor + /// protocol or on synthetic deltas (canned greetings, slash-command echoes). + public let messageId: String? + /// 0-based content-block index within the parent `messageId`. Optional for + /// backwards compatibility with synthetic deltas that don't bind to a block. + public let blockIndex: Int? + /// Monotonically increasing per-conversation sequence number for idempotent + /// client replay. Optional during the streaming-architecture rollout — + /// daemons that pre-date the protocol omit it. + public let seq: Int? - public init(type: String, text: String, conversationId: String? = nil) { + public init(type: String, text: String, conversationId: String? = nil, messageId: String? = nil, blockIndex: Int? = nil, seq: Int? = nil) { self.type = type self.text = text self.conversationId = conversationId + self.messageId = messageId + self.blockIndex = blockIndex + self.seq = seq } } @@ -514,6 +529,63 @@ public struct AvatarUpdated: Codable, Sendable { } } +/// `block_open` SSE event. Declares the start of a new content block within an +/// assistant message. Paired with `BlockClose` at the block's end. +/// +/// Block kinds: +/// - `text` — a streamed text block opened on the first text delta +/// emitted after the previous block closed. +/// - `tool_use` — a tool invocation; opened immediately before the matching +/// `ToolUseStart` and closed when the corresponding `ToolResult` +/// arrives. +/// +/// `blockIndex` is 0-based and monotonically increases within a single message. +public struct BlockOpenEvent: Codable, Sendable { + public let type: String + public let messageId: String + public let blockIndex: Int + /// "text" or "tool_use". + public let blockType: String + /// Tool name when `blockType == "tool_use"`; absent otherwise. + public let toolName: String? + /// Tool-use id when `blockType == "tool_use"`; absent otherwise. + public let toolUseId: String? + /// Monotonically increasing per-conversation sequence number. + public let seq: Int + public let conversationId: String? + + public init(type: String = "block_open", messageId: String, blockIndex: Int, blockType: String, toolName: String? = nil, toolUseId: String? = nil, seq: Int, conversationId: String? = nil) { + self.type = type + self.messageId = messageId + self.blockIndex = blockIndex + self.blockType = blockType + self.toolName = toolName + self.toolUseId = toolUseId + self.seq = seq + self.conversationId = conversationId + } +} + +/// `block_close` SSE event. Peer of `BlockOpen`. Text blocks close when the +/// next non-text content starts (or when the turn ends); tool_use blocks close +/// when their matching `ToolResult` arrives. +public struct BlockCloseEvent: Codable, Sendable { + public let type: String + public let messageId: String + public let blockIndex: Int + /// Monotonically increasing per-conversation sequence number. + public let seq: Int + public let conversationId: String? + + public init(type: String = "block_close", messageId: String, blockIndex: Int, seq: Int, conversationId: String? = nil) { + self.type = type + self.messageId = messageId + self.blockIndex = blockIndex + self.seq = seq + self.conversationId = conversationId + } +} + public struct BundleAppRequest: Codable, Sendable { public let type: String public let appId: String @@ -2760,6 +2832,46 @@ public struct MessageComplete: Codable, Sendable { } } +/// `message_open` SSE event. Declares a stable `messageId` (UUIDv7) for an +/// assistant message at the start of a turn, before the first content event. +/// Paired with `MessageClose` at end-of-turn. Clients should anchor a message +/// bubble at `MessageOpen` instead of inferring identity from the first delta. +public struct MessageOpenEvent: Codable, Sendable { + public let type: String + public let messageId: String + /// "assistant". + public let role: String + /// Monotonically increasing per-conversation sequence number. + public let seq: Int + public let conversationId: String? + + public init(type: String = "message_open", messageId: String, role: String, seq: Int, conversationId: String? = nil) { + self.type = type + self.messageId = messageId + self.role = role + self.seq = seq + self.conversationId = conversationId + } +} + +/// `message_close` SSE event. Peer of `MessageOpen`. Marks the assistant turn +/// done in the new streaming architecture; the legacy `MessageComplete` event +/// continues to fire alongside it during the rollout for backward compatibility. +public struct MessageCloseEvent: Codable, Sendable { + public let type: String + public let messageId: String + /// Monotonically increasing per-conversation sequence number. + public let seq: Int + public let conversationId: String? + + public init(type: String = "message_close", messageId: String, seq: Int, conversationId: String? = nil) { + self.type = type + self.messageId = messageId + self.seq = seq + self.conversationId = conversationId + } +} + public struct MessageContentRequest: Codable, Sendable { public let type: String public let conversationId: String @@ -4809,13 +4921,23 @@ public struct ToolInputDelta: Codable, Sendable { public let conversationId: String? /// The tool_use block ID for client-side correlation. public let toolUseId: String? + /// Database row id of the assistant message that owns this tool_use block. + /// Same semantics as `AssistantTextDelta.messageId`. + public let messageId: String? + /// 0-based content-block index within the parent `messageId`. + public let blockIndex: Int? + /// Monotonically increasing per-conversation sequence number. + public let seq: Int? - public init(type: String, toolName: String, content: String, conversationId: String? = nil, toolUseId: String? = nil) { + public init(type: String, toolName: String, content: String, conversationId: String? = nil, toolUseId: String? = nil, messageId: String? = nil, blockIndex: Int? = nil, seq: Int? = nil) { self.type = type self.toolName = toolName self.content = content self.conversationId = conversationId self.toolUseId = toolUseId + self.messageId = messageId + self.blockIndex = blockIndex + self.seq = seq } } @@ -4997,8 +5119,15 @@ public struct ToolResult: Codable, Sendable { /// trust rule from the chip-ladder UI. public let riskAllowlistOptions: [ConfirmationRequestAllowlistOption]? public let riskDirectoryScopeOptions: [ConfirmationRequestDirectoryScopeOption]? + /// Database row id of the assistant message that owns the parent tool_use + /// block. Same semantics as `AssistantTextDelta.messageId`. + public let messageId: String? + /// 0-based content-block index within the parent `messageId`. + public let blockIndex: Int? + /// Monotonically increasing per-conversation sequence number. + public let seq: Int? - public init(type: String, toolName: String, result: String, isError: Bool? = nil, diff: ToolResultDiff? = nil, status: String? = nil, conversationId: String? = nil, imageDataList: [String]? = nil, toolUseId: String? = nil, riskLevel: String? = nil, riskReason: String? = nil, matchedTrustRuleId: String? = nil, approvalMode: String? = nil, approvalReason: String? = nil, riskThreshold: String? = nil, isContainerized: Bool? = nil, riskScopeOptions: [ToolResultRiskScopeOption]? = nil, riskAllowlistOptions: [ConfirmationRequestAllowlistOption]? = nil, riskDirectoryScopeOptions: [ConfirmationRequestDirectoryScopeOption]? = nil) { + public init(type: String, toolName: String, result: String, isError: Bool? = nil, diff: ToolResultDiff? = nil, status: String? = nil, conversationId: String? = nil, imageDataList: [String]? = nil, toolUseId: String? = nil, riskLevel: String? = nil, riskReason: String? = nil, matchedTrustRuleId: String? = nil, approvalMode: String? = nil, approvalReason: String? = nil, riskThreshold: String? = nil, isContainerized: Bool? = nil, riskScopeOptions: [ToolResultRiskScopeOption]? = nil, riskAllowlistOptions: [ConfirmationRequestAllowlistOption]? = nil, riskDirectoryScopeOptions: [ConfirmationRequestDirectoryScopeOption]? = nil, messageId: String? = nil, blockIndex: Int? = nil, seq: Int? = nil) { self.type = type self.toolName = toolName self.result = result @@ -5018,6 +5147,9 @@ public struct ToolResult: Codable, Sendable { self.riskScopeOptions = riskScopeOptions self.riskAllowlistOptions = riskAllowlistOptions self.riskDirectoryScopeOptions = riskDirectoryScopeOptions + self.messageId = messageId + self.blockIndex = blockIndex + self.seq = seq } } @@ -5065,13 +5197,23 @@ public struct ToolUseStart: Codable, Sendable { public let conversationId: String? /// The tool_use block ID for client-side correlation. public let toolUseId: String? + /// Database row id of the assistant message that owns this tool_use block. + /// Same semantics as `AssistantTextDelta.messageId`. + public let messageId: String? + /// 0-based content-block index within the parent `messageId`. + public let blockIndex: Int? + /// Monotonically increasing per-conversation sequence number. + public let seq: Int? - public init(type: String, toolName: String, input: [String: AnyCodable], conversationId: String? = nil, toolUseId: String? = nil) { + public init(type: String, toolName: String, input: [String: AnyCodable], conversationId: String? = nil, toolUseId: String? = nil, messageId: String? = nil, blockIndex: Int? = nil, seq: Int? = nil) { self.type = type self.toolName = toolName self.input = input self.conversationId = conversationId self.toolUseId = toolUseId + self.messageId = messageId + self.blockIndex = blockIndex + self.seq = seq } } diff --git a/clients/shared/Network/MessageTypes.swift b/clients/shared/Network/MessageTypes.swift index ea4125c2463..8792e17eb27 100644 --- a/clients/shared/Network/MessageTypes.swift +++ b/clients/shared/Network/MessageTypes.swift @@ -687,6 +687,26 @@ extension AssistantTextDelta { } } +/// Declares the start of an assistant message — carries a stable `messageId` +/// that the block-scoped events in this turn stamp on. Paired with +/// `MessageCloseMessage` at end-of-turn. +/// Backed by generated `MessageOpenEvent`. +public typealias MessageOpenMessage = MessageOpenEvent + +/// Declares the start of a content block within an assistant message. +/// Paired with `BlockCloseMessage`. +/// Backed by generated `BlockOpenEvent`. +public typealias BlockOpenMessage = BlockOpenEvent + +/// Declares the end of a content block within an assistant message. +/// Peer of `BlockOpenMessage`. +/// Backed by generated `BlockCloseEvent`. +public typealias BlockCloseMessage = BlockCloseEvent + +/// Declares the end of an assistant turn. Peer of `MessageOpenMessage`. +/// Backed by generated `MessageCloseEvent`. +public typealias MessageCloseMessage = MessageCloseEvent + /// Streamed thinking delta from the assistant's reasoning. public typealias AssistantThinkingDeltaMessage = AssistantThinkingDelta @@ -2914,6 +2934,10 @@ public enum ServerMessage: Decodable, Sendable { case assistantTextDelta(AssistantTextDeltaMessage) case assistantActivityState(AssistantActivityStateMessage) case assistantThinkingDelta(AssistantThinkingDeltaMessage) + case messageOpen(MessageOpenMessage) + case blockOpen(BlockOpenMessage) + case blockClose(BlockCloseMessage) + case messageClose(MessageCloseMessage) case messageComplete(MessageCompleteMessage) case conversationInfo(ConversationInfoMessage) case conversationInferenceProfileUpdated(ConversationInferenceProfileUpdatedMessage) @@ -3135,6 +3159,18 @@ public enum ServerMessage: Decodable, Sendable { case "assistant_thinking_delta": let message = try AssistantThinkingDeltaMessage(from: decoder) self = .assistantThinkingDelta(message) + case "message_open": + let message = try MessageOpenMessage(from: decoder) + self = .messageOpen(message) + case "block_open": + let message = try BlockOpenMessage(from: decoder) + self = .blockOpen(message) + case "block_close": + let message = try BlockCloseMessage(from: decoder) + self = .blockClose(message) + case "message_close": + let message = try MessageCloseMessage(from: decoder) + self = .messageClose(message) case "message_complete": let message = try MessageCompleteMessage(from: decoder) self = .messageComplete(message)