From 5b81578dc7a809f0f29ea3780232dd0c0daebc11 Mon Sep 17 00:00:00 2001 From: tirmansidhu Date: Thu, 28 May 2026 20:56:51 +0100 Subject: [PATCH] streaming: collapse EventStreamClient fan-out into typed routing --- .../App/AppDelegate+ConnectionSetup.swift | 2 +- .../App/DiskPressureStatusStore.swift | 2 +- .../ComputerUse/HostCuExecutor.swift | 2 +- .../MainWindow/ConversationManager.swift | 10 +- .../MainWindow/ConversationRestorer.swift | 2 +- .../ConversationSelectionStore.swift | 4 +- .../Features/MainWindow/MainWindow.swift | 2 +- .../Features/MainWindow/MainWindowView.swift | 8 +- .../MainWindow/ThreadWindowManager.swift | 2 +- .../Features/Settings/SettingsStore.swift | 2 +- .../shared/Features/Chat/ChatViewModel.swift | 89 ++--- .../Chat/MessageSendCoordinator.swift | 12 +- .../Features/Chat/MessageStreamReducer.swift | 8 +- .../Features/Contacts/ContactsStore.swift | 2 +- .../Features/Directory/DirectoryStore.swift | 2 +- .../shared/Network/EventStreamClient.swift | 330 +++++++++++++++--- .../Tests/GatewayConnectionManagerTests.swift | 11 +- 17 files changed, 356 insertions(+), 134 deletions(-) diff --git a/clients/macos/vellum-assistant/App/AppDelegate+ConnectionSetup.swift b/clients/macos/vellum-assistant/App/AppDelegate+ConnectionSetup.swift index 7ed42b356b6..d2bad440c02 100644 --- a/clients/macos/vellum-assistant/App/AppDelegate+ConnectionSetup.swift +++ b/clients/macos/vellum-assistant/App/AppDelegate+ConnectionSetup.swift @@ -219,7 +219,7 @@ extension AppDelegate { eventSubscriptionTask?.cancel() eventSubscriptionTask = Task { @MainActor [weak self] in guard let self else { return } - let stream = self.eventStreamClient.subscribe() + let stream = self.eventStreamClient.subscribeAppDelegateEvents() for await message in stream { guard !Task.isCancelled else { break } switch message { diff --git a/clients/macos/vellum-assistant/App/DiskPressureStatusStore.swift b/clients/macos/vellum-assistant/App/DiskPressureStatusStore.swift index 8203594c6c1..97968d12509 100644 --- a/clients/macos/vellum-assistant/App/DiskPressureStatusStore.swift +++ b/clients/macos/vellum-assistant/App/DiskPressureStatusStore.swift @@ -212,7 +212,7 @@ final class DiskPressureStatusStore { guard eventTask == nil, let eventStreamClient else { return } eventTask = Task { @MainActor [weak self] in guard let self else { return } - for await message in eventStreamClient.subscribe() { + for await message in eventStreamClient.subscribeDiskPressureEvents() { guard !Task.isCancelled else { break } self.handle(message) } diff --git a/clients/macos/vellum-assistant/ComputerUse/HostCuExecutor.swift b/clients/macos/vellum-assistant/ComputerUse/HostCuExecutor.swift index 41267eb2ab6..94c921dc2d4 100644 --- a/clients/macos/vellum-assistant/ComputerUse/HostCuExecutor.swift +++ b/clients/macos/vellum-assistant/ComputerUse/HostCuExecutor.swift @@ -32,7 +32,7 @@ enum HostCuExecutor { on client: GatewayConnectionManager, overlayProvider: @escaping @MainActor (_ conversationId: String, _ request: HostCuRequest) -> HostCuSessionProxy? = { _, _ in nil } ) { - // No-op: host CU requests are handled via EventStreamClient.subscribe() in AppDelegate. + // No-op: host CU requests are handled via EventStreamClient.subscribeAppDelegateEvents() in AppDelegate. } } diff --git a/clients/macos/vellum-assistant/Features/MainWindow/ConversationManager.swift b/clients/macos/vellum-assistant/Features/MainWindow/ConversationManager.swift index 877b45de336..26676da98c6 100644 --- a/clients/macos/vellum-assistant/Features/MainWindow/ConversationManager.swift +++ b/clients/macos/vellum-assistant/Features/MainWindow/ConversationManager.swift @@ -266,7 +266,7 @@ final class ConversationManager: ConversationRestorerDelegate { } Task { @MainActor [weak self] in guard let self else { return } - for await message in self.eventStreamClient.subscribe() { + for await message in self.eventStreamClient.subscribeConversationOrchestrationEvents() { switch message { case .conversationIdResolved(let localId, let serverId): self.resolveConversationId(from: localId, to: serverId) @@ -744,7 +744,9 @@ final class ConversationManager: ConversationRestorerDelegate { if markHistoryLoaded { viewModel.isHistoryLoaded = true } - viewModel.startMessageLoop() + // No-op when the VM was already initialized — chat-event subscription + // is started at VM init and runs for the VM lifetime. + viewModel.startChatEventSubscription() listStore.conversations.insert(conversation, at: 0) selectionStore.chatViewModels[conversation.id] = viewModel @@ -1453,7 +1455,7 @@ final class ConversationManager: ConversationRestorerDelegate { if let viewModel = selectionStore.chatViewModels[existingConversation.id] { viewModel.conversationId = item.id viewModel.isChannelConversation = updatedConversation.isChannelConversation - viewModel.ensureMessageLoopStarted() + viewModel.startChatEventSubscription() } return existingConversation.id } @@ -1462,7 +1464,7 @@ final class ConversationManager: ConversationRestorerDelegate { let viewModel = makeViewModel() viewModel.conversationId = item.id viewModel.isChannelConversation = conversationModel.isChannelConversation - viewModel.startMessageLoop() + viewModel.startChatEventSubscription() listStore.conversations.insert(conversationModel, at: 0) selectionStore.chatViewModels[conversationModel.id] = viewModel diff --git a/clients/macos/vellum-assistant/Features/MainWindow/ConversationRestorer.swift b/clients/macos/vellum-assistant/Features/MainWindow/ConversationRestorer.swift index 66bfeb21dec..430cb7e3705 100644 --- a/clients/macos/vellum-assistant/Features/MainWindow/ConversationRestorer.swift +++ b/clients/macos/vellum-assistant/Features/MainWindow/ConversationRestorer.swift @@ -131,7 +131,7 @@ final class ConversationRestorer { func startObserving(skipInitialFetch: Bool = false) { Task { @MainActor [weak self] in guard let self else { return } - for await message in self.eventStreamClient.subscribe() { + for await message in self.eventStreamClient.subscribeConversationListEvents() { switch message { case .conversationListResponse(let response): // SSE-pushed responses don't have the foreground/background diff --git a/clients/macos/vellum-assistant/Features/MainWindow/ConversationSelectionStore.swift b/clients/macos/vellum-assistant/Features/MainWindow/ConversationSelectionStore.swift index 8821818f443..233e4af0029 100644 --- a/clients/macos/vellum-assistant/Features/MainWindow/ConversationSelectionStore.swift +++ b/clients/macos/vellum-assistant/Features/MainWindow/ConversationSelectionStore.swift @@ -62,7 +62,7 @@ final class ConversationSelectionStore { activeConversation = listStore.conversationsByLocalId[conversationId] let vm = getOrCreateViewModel(for: conversationId) - vm?.ensureMessageLoopStarted() + vm?.startChatEventSubscription() onActiveConversationChanged?(conversationId) // Notify the daemon so it rebinds the socket to this conversation. @@ -391,7 +391,7 @@ final class ConversationSelectionStore { /// Returns an existing or newly-created ViewModel for a detached pop-out window. func viewModelForDetachedWindow(conversationLocalId: UUID) -> ChatViewModel? { let vm = getOrCreateViewModel(for: conversationLocalId) - vm?.ensureMessageLoopStarted() + vm?.startChatEventSubscription() return vm } diff --git a/clients/macos/vellum-assistant/Features/MainWindow/MainWindow.swift b/clients/macos/vellum-assistant/Features/MainWindow/MainWindow.swift index e2fc34df622..8659907a950 100644 --- a/clients/macos/vellum-assistant/Features/MainWindow/MainWindow.swift +++ b/clients/macos/vellum-assistant/Features/MainWindow/MainWindow.swift @@ -357,7 +357,7 @@ public final class MainWindow { documentManager.connectionManager = connectionManager Task { @MainActor [weak self] in guard let self else { return } - for await message in self.eventStreamClient.subscribe() { + for await message in self.eventStreamClient.subscribeTraceEvents() { switch message { case .traceEvent(let msg): self.traceStore.ingest(msg) diff --git a/clients/macos/vellum-assistant/Features/MainWindow/MainWindowView.swift b/clients/macos/vellum-assistant/Features/MainWindow/MainWindowView.swift index eb1bd31e965..cd308e85a9b 100644 --- a/clients/macos/vellum-assistant/Features/MainWindow/MainWindowView.swift +++ b/clients/macos/vellum-assistant/Features/MainWindow/MainWindowView.swift @@ -162,7 +162,7 @@ struct MainWindowView: View { // without an app relaunch. let homeStoreInstance = HomeStore( client: DefaultHomeStateClient(), - messageStream: eventStreamClient.subscribe() + messageStream: eventStreamClient.subscribeHomeEvents() ) self._homeStore = State(initialValue: homeStoreInstance) // Same eager-construction rationale for the activity feed store @@ -173,7 +173,7 @@ struct MainWindowView: View { // already looking at the feed. self._feedStore = State(initialValue: HomeFeedStore( client: DefaultHomeFeedClient(), - messageStream: eventStreamClient.subscribe(), + messageStream: eventStreamClient.subscribeHomeEvents(), onSSEUpdate: { [weak homeStoreInstance] in guard let homeStoreInstance else { return } if !homeStoreInstance.isHomeTabVisible { @@ -181,9 +181,9 @@ struct MainWindowView: View { } } )) - // Meet status panel subscribes to the same shared SSE stream. + // Meet status panel subscribes to the meet-domain dispatcher. self._meetStatusViewModel = State(initialValue: MeetStatusViewModel( - messageStream: eventStreamClient.subscribe() + messageStream: eventStreamClient.subscribeMeetEvents() )) } diff --git a/clients/macos/vellum-assistant/Features/MainWindow/ThreadWindowManager.swift b/clients/macos/vellum-assistant/Features/MainWindow/ThreadWindowManager.swift index 16f7540eb49..697cc6d58a9 100644 --- a/clients/macos/vellum-assistant/Features/MainWindow/ThreadWindowManager.swift +++ b/clients/macos/vellum-assistant/Features/MainWindow/ThreadWindowManager.swift @@ -74,7 +74,7 @@ final class ThreadWindowManager { ) threadWindows[conversationLocalId] = threadWindow - viewModel.ensureMessageLoopStarted() + viewModel.startChatEventSubscription() log.info("Opened thread window for \(conversationLocalId), \(self.threadWindows.count) total") return true diff --git a/clients/macos/vellum-assistant/Features/Settings/SettingsStore.swift b/clients/macos/vellum-assistant/Features/Settings/SettingsStore.swift index e69d77063bd..5d83330d25b 100644 --- a/clients/macos/vellum-assistant/Features/Settings/SettingsStore.swift +++ b/clients/macos/vellum-assistant/Features/Settings/SettingsStore.swift @@ -743,7 +743,7 @@ public final class SettingsStore: ObservableObject { // Subscribe to SSE-pushed config updates Task { @MainActor [weak self] in guard let self, let eventStreamClient = self.eventStreamClient else { return } - for await message in eventStreamClient.subscribe() { + for await message in eventStreamClient.subscribeSettingsEvents() { switch message { case .ingressConfigResponse(let response): self.handleIngressConfigResponse(response) diff --git a/clients/shared/Features/Chat/ChatViewModel.swift b/clients/shared/Features/Chat/ChatViewModel.swift index adac23f02ed..9c22a6a39ec 100644 --- a/clients/shared/Features/Chat/ChatViewModel.swift +++ b/clients/shared/Features/Chat/ChatViewModel.swift @@ -810,7 +810,6 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate { public var conversationId: String? { didSet { - broadcastFilter.conversationId = conversationId // If the daemon reconnected before this VM had a conversation ID, a deferred // flush was requested. Now that we have a conversation, run it. if conversationId != nil && needsOfflineFlush { @@ -950,13 +949,12 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate { /// (conversation_create sent, awaiting conversation_info). Used by ConversationManager /// to decide whether it's safe to release the VM on archive. public var isBootstrapping: Bool { bootstrapCorrelationId != nil } - @ObservationIgnored var messageLoopTask: Task? - /// Monotonically increasing ID used to distinguish successive message-loop - /// tasks so that a cancelled loop's cleanup doesn't clear a newer replacement. - @ObservationIgnored private var messageLoopGeneration: UInt64 = 0 - /// Mutable filter shared with EventStreamClient so conversation-scoped SSE - /// messages are only delivered to the matching subscriber. - @ObservationIgnored private let broadcastFilter = EventStreamClient.ConversationFilter() + /// Single chat-event subscription that lives for the lifetime of this VM. + /// Started in `init` via `subscribeChatEvents()` and torn down in `deinit`. + /// Replaces the legacy `startMessageLoop` / `messageLoopGeneration` + /// machinery, which restarted the subscription per turn and produced a + /// double-subscriber window during the cancel-and-resubscribe gap. + @ObservationIgnored var chatEventSubscriptionTask: Task? @ObservationIgnored var currentAssistantMessageId: UUID? /// The trimmed user text that initiated the current assistant turn. /// Used to tag the assistant message (e.g. modelList for "/models") without @@ -1417,6 +1415,13 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate { // Initialize the action handler for server message dispatch. self.actionHandler = ChatActionHandler(viewModel: self) + // Subscribe to the typed chat-event dispatcher for the lifetime of + // this VM. The subscription does not restart per turn (which is what + // the legacy `startMessageLoop` did), so there is no window in which + // two overlapping subscribers can both apply the same event — one of + // the three root causes of the duplication bug this plan addresses. + startChatEventSubscription() + // Start consuming streaming events into the new MessageStore. As of // PR 4, the chat list renders from `renderedMessages` (which merges // the legacy `messages` array with snapshots from this store via @@ -1625,55 +1630,27 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate { sendCoordinator.flushOfflineQueue() } - public func startMessageLoop() { - messageLoopTask?.cancel() - let messageStream = eventStreamClient.subscribe(filter: broadcastFilter) - - messageLoopGeneration &+= 1 - let generation = messageLoopGeneration - - messageLoopTask = Task { @MainActor [weak self] in - for await message in messageStream { + /// Start the chat-event subscription for the lifetime of this view model. + /// + /// Safe to call multiple times — no-ops once a subscription is already + /// active. The stream is owned by `EventStreamClient` and lives for as + /// long as the SSE connection (or this VM) does; we no longer tear it + /// down between turns. Mid-turn connection drops are recovered through + /// the `daemonDidReconnect` / `eventStreamDidReconnect` notifications, + /// which run `handleTransportReconnect()` — see that method for the + /// spinner/cursor/history-catch-up logic that used to live inline in + /// the per-turn loop restart. + public func startChatEventSubscription() { + guard chatEventSubscriptionTask == nil else { return } + let stream = eventStreamClient.subscribeChatEvents() + chatEventSubscriptionTask = Task { @MainActor [weak self] in + for await message in stream { guard let self, !Task.isCancelled else { break } self.handleServerMessage(message) } - // Stream ended (e.g. daemon disconnected) — clear the task reference - // so the next sendUserMessage() call will re-subscribe. - // Only nil out if this task is still the current one; a cancelled - // loop that finishes after its replacement must not wipe the new - // task reference, which would cause duplicate subscriptions. - if self?.messageLoopGeneration == generation { - self?.messageLoopTask = nil - // Reset spinner state — if the connection drops mid-turn the client - // never receives message_complete, leaving the UI stuck. - self?.isThinking = false - self?.isSending = false - self?.isCancelling = false - // Stream dropped mid-turn — `message_complete` won't arrive, - // so clear pending turns to avoid bumping - // `interactiveTurnCompletionTick` on the next turn. - self?.messageManager.pendingUserTurnCount = 0 - self?.messageManager.staleCancelEventsExpected = 0 - if let existingId = self?.currentAssistantMessageId { - self?.messages.finalizeStreamingMessage(id: existingId, completeToolCalls: .none) - } - self?.clearCurrentTurnTracking() - self?.discardStreamingBuffer() - self?.discardPartialOutputBuffer() - // If a send-direct was pending when the stream dropped, - // dispatch it now so the message isn't silently lost. - self?.dispatchPendingSendDirect() - } } } - /// Start the daemon message stream if this chat has a bound conversation and - /// no active loop yet. - public func ensureMessageLoopStarted() { - guard conversationId != nil, messageLoopTask == nil else { return } - startMessageLoop() - } - /// Send a message to the daemon without showing a user bubble in the chat. /// Used for automated actions like inline model picker selections. /// Returns `true` if the message was sent (or a conversation bootstrap was started), @@ -1777,10 +1754,10 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate { suggestion = nil pendingSuggestionRequestId = nil - // Make sure we're listening for the response - if messageLoopTask == nil { - startMessageLoop() - } + // Make sure we're listening for the response. Idempotent — the + // chat-event subscription is started once at init and lives for + // the lifetime of the VM. + startChatEventSubscription() Task { let success = await regenerateClient.regenerate(conversationId: conversationId) @@ -2675,7 +2652,7 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate { // Cancel all Combine subscriptions first so no new work can be scheduled // from incoming publisher events while the remaining cleanup runs. cancellables.removeAll() - messageLoopTask?.cancel() + chatEventSubscriptionTask?.cancel() streamingFlushTask?.cancel() partialOutputFlushTask?.cancel() cancelTimeoutTask?.cancel() diff --git a/clients/shared/Features/Chat/MessageSendCoordinator.swift b/clients/shared/Features/Chat/MessageSendCoordinator.swift index c68eb5e9a38..8b7cd592838 100644 --- a/clients/shared/Features/Chat/MessageSendCoordinator.swift +++ b/clients/shared/Features/Chat/MessageSendCoordinator.swift @@ -62,7 +62,6 @@ protocol MessageSendCoordinatorDelegate: AnyObject { // MARK: - Actions func flushCoalescedPublish() - func startMessageLoop() func refreshGuardianPrompts() func discardStreamingBuffer() func discardPartialOutputBuffer() @@ -74,7 +73,6 @@ protocol MessageSendCoordinatorDelegate: AnyObject { var onConversationCreated: ((String) -> Void)? { get } var onFirstUserMessage: ((String) -> Void)? { get set } var onUserMessageSent: (() -> Void)? { get } - var messageLoopTask: Task? { get } } /// Side-effect coordinator that owns the message send/cancel/queue logic. @@ -380,8 +378,8 @@ final class MessageSendCoordinator { } } - // Subscribe to daemon stream - delegate.startMessageLoop() + // The chat-event subscription was started at VM init and lives + // for the lifetime of this view model — no per-send restart. // Generate conversation ID locally — conversation creation is implicit // for HTTP transport. The conversationKey acts as the conversation. @@ -516,10 +514,8 @@ final class MessageSendCoordinator { messageManager.pendingUserTurnCount += 1 } - // Make sure we're listening - if delegate.messageLoopTask == nil { - delegate.startMessageLoop() - } + // The chat-event subscription is started once at VM init and lives + // for the VM lifetime — no per-send (re)subscribe. // Consume pending onboarding context on the first send so it's // included in the POST body. Nil it out immediately so subsequent diff --git a/clients/shared/Features/Chat/MessageStreamReducer.swift b/clients/shared/Features/Chat/MessageStreamReducer.swift index 398b28e35bd..dd10ca3d085 100644 --- a/clients/shared/Features/Chat/MessageStreamReducer.swift +++ b/clients/shared/Features/Chat/MessageStreamReducer.swift @@ -49,14 +49,14 @@ public final class MessageStreamReducer { subscriptionTask?.cancel() } - /// Subscribe to the `EventStreamClient` and feed every message into - /// `apply(event:)`. Safe to call multiple times — re-subscribes after - /// cancelling the previous task. + /// Subscribe to the chat event dispatcher and feed every relevant + /// message into `apply(event:)`. Safe to call multiple times — + /// re-subscribes after cancelling the previous task. public func start() { subscriptionTask?.cancel() subscriptionTask = Task { [weak self] in guard let self else { return } - let stream = self.eventStreamClient.subscribe() + let stream = self.eventStreamClient.subscribeChatEvents() for await message in stream { if Task.isCancelled { return } self.apply(event: message) diff --git a/clients/shared/Features/Contacts/ContactsStore.swift b/clients/shared/Features/Contacts/ContactsStore.swift index dacf5197829..c48fc4ee69c 100644 --- a/clients/shared/Features/Contacts/ContactsStore.swift +++ b/clients/shared/Features/Contacts/ContactsStore.swift @@ -125,7 +125,7 @@ public final class ContactsStore { subscriptionTask?.cancel() subscriptionTask = Task { [weak self] in guard let self else { return } - let stream = self.eventStreamClient.subscribe() + let stream = self.eventStreamClient.subscribeContactsEvents() for await message in stream { guard !Task.isCancelled else { return } diff --git a/clients/shared/Features/Directory/DirectoryStore.swift b/clients/shared/Features/Directory/DirectoryStore.swift index b4fec09a315..47f3e8b3c04 100644 --- a/clients/shared/Features/Directory/DirectoryStore.swift +++ b/clients/shared/Features/Directory/DirectoryStore.swift @@ -191,7 +191,7 @@ public final class DirectoryStore: ObservableObject { appFilesChangedTask?.cancel() appFilesChangedTask = Task { [weak self] in guard let eventStreamClient = self?.eventStreamClient else { return } - let stream = eventStreamClient.subscribe() + let stream = eventStreamClient.subscribeAppFilesEvents() for await message in stream { guard let self, !Task.isCancelled else { return } diff --git a/clients/shared/Network/EventStreamClient.swift b/clients/shared/Network/EventStreamClient.swift index 2165a60a532..17e66ede5d1 100644 --- a/clients/shared/Network/EventStreamClient.swift +++ b/clients/shared/Network/EventStreamClient.swift @@ -79,48 +79,167 @@ private final class SSEHandshakeCaptureDelegate: NSObject, URLSessionDataDelegat } } -/// Client that manages an SSE connection to the assistant runtime and broadcasts -/// parsed `ServerMessage` values to multiple independent subscribers. +/// Routing category for parsed `ServerMessage` events. Each consumer subscribes +/// to exactly one category via the typed `subscribeXxxEvents()` entry points; +/// `EventStreamClient` parses the SSE stream once and dispatches each event to +/// the consumers in the categories that care about its type. +/// +/// Replaces the legacy single-broadcaster `subscribe(filter:)` model that +/// delivered every event to every subscriber. The legacy model caused two +/// concrete problems this enum addresses: +/// +/// 1. **Double-subscriber window on loop restart.** `ChatViewModel` used to +/// tear down and re-create its subscription every turn (`startMessageLoop`); +/// if a streaming event landed during that window, it could be applied twice +/// by two overlapping subscribers. Per-domain dispatchers let each consumer +/// subscribe once at init for the lifetime of its owner — no restart, no +/// overlap. +/// 2. **Unbounded fan-out per subscriber.** The legacy `AsyncStream` per +/// subscriber used the default unbounded buffer. Each typed dispatcher now +/// runs with `.bufferingNewest(eventStreamSubscriberBufferLimit)` so a slow +/// consumer cannot pin arbitrary memory. +public enum EventStreamCategory: Sendable { + /// Streaming chat events consumed by `ChatViewModel`'s action handler and + /// `MessageStreamReducer` — message lifecycle, blocks, deltas, tool use, + /// queue / dequeue, generation handoff, errors, surfaces (which inline- + /// render inside chat), confirmations, subagent events, and assistant + /// status. The set is intentionally broad because chat is the primary + /// consumer of most server events. + case chat + /// Conversation list management — `conversation_list_response`, + /// `history_response`, `conversation_title_updated`, + /// `conversation_list_invalidated`. Consumed by `ConversationRestorer`. + case conversationList + /// Conversation orchestration — `conversation_id_resolved`, + /// `conversation_inference_profile_updated`, ACP session lifecycle. + /// Consumed by `ConversationManager`. + case conversationOrchestration + /// Trace + usage telemetry consumed by `MainWindow`. + case trace + /// Disk pressure status + feature-flag changes consumed by + /// `DiskPressureStatusStore`. + case diskPressure + /// `contacts_changed` invalidations consumed by `ContactsStore`. + case contacts + /// `app_files_changed` invalidations consumed by `DirectoryStore`. + case appFiles + /// Settings store events — currently `ingress_config_response` and + /// `telegram_config_response`. + case settings + /// Home tab events — `relationship_state_updated`, `home_feed_updated`. + case home + /// Meet status events — `meet_*`. + case meet + /// App-delegate level cross-domain orchestration: notifications, + /// open_url, open_conversation, document editor, recording control, + /// identity / avatar / sounds / config / feature flags, host tool + /// requests + cancels, signing identity, sync invalidation, surface + /// dismiss/complete for the surface manager, conversation errors, etc. + case appDelegate +} + +/// Bound for each typed dispatcher's per-subscriber buffer. The SSE loop runs +/// on @MainActor and consumers are also @MainActor, so under normal load this +/// is empty most of the time. The bound exists to cap memory if a consumer +/// hangs — newer events take precedence over older buffered ones because the +/// streaming reducer is idempotent and a stale event has no value once a newer +/// one of the same kind has been seen. +private let eventStreamSubscriberBufferLimit = 256 + +/// Client that manages an SSE connection to the assistant runtime and routes +/// parsed `ServerMessage` values to typed per-domain dispatchers. /// /// Backed by `GatewayHTTPClient.stream()` for authenticated SSE connections. @MainActor public final class EventStreamClient { - // MARK: - Broadcast Subscribers - - /// Mutable filter that a subscriber can update as its conversation changes. - /// Passed by reference so callers can set `conversationId` after subscribing - /// (e.g. when `conversationInfo` arrives and assigns the conversation ID). - public final class ConversationFilter: @unchecked Sendable { - public var conversationId: String? - public init(conversationId: String? = nil) { self.conversationId = conversationId } - } + // MARK: - Typed Dispatchers private struct Subscription { let continuation: AsyncStream.Continuation - let filter: ConversationFilter? } - private var subscribers: [UUID: Subscription] = [:] + /// Subscribers grouped by category. Each category has its own subscriber + /// list so a `chat` consumer's slowness can't backpressure a + /// `conversationList` consumer (they share no buffer). + private var subscribers: [EventStreamCategory: [UUID: Subscription]] = [:] - /// Creates a new message stream for the caller. - /// - /// - Parameter filter: Optional conversation filter. When provided, - /// messages whose `conversationId` doesn't match are not delivered, - /// reducing unnecessary subscriber wakeups. Messages with no - /// `conversationId` (system-level) are always delivered. - public func subscribe(filter: ConversationFilter? = nil) -> AsyncStream { + private func makeSubscription(for category: EventStreamCategory) -> AsyncStream { let id = UUID() - let (stream, continuation) = AsyncStream.makeStream() - subscribers[id] = Subscription(continuation: continuation, filter: filter) + let (stream, continuation) = AsyncStream.makeStream( + bufferingPolicy: .bufferingNewest(eventStreamSubscriberBufferLimit) + ) + var bucket = subscribers[category] ?? [:] + bucket[id] = Subscription(continuation: continuation) + subscribers[category] = bucket continuation.onTermination = { [weak self] _ in Task { @MainActor [weak self] in - self?.subscribers.removeValue(forKey: id) + guard let self else { return } + var bucket = self.subscribers[category] ?? [:] + bucket.removeValue(forKey: id) + self.subscribers[category] = bucket } } return stream } + // MARK: - Typed Subscribe Entry Points + + /// Subscribe to streaming chat events. See ``EventStreamCategory/chat``. + public func subscribeChatEvents() -> AsyncStream { + makeSubscription(for: .chat) + } + + /// Subscribe to conversation list management events. + public func subscribeConversationListEvents() -> AsyncStream { + makeSubscription(for: .conversationList) + } + + /// Subscribe to conversation orchestration events. + public func subscribeConversationOrchestrationEvents() -> AsyncStream { + makeSubscription(for: .conversationOrchestration) + } + + /// Subscribe to trace + usage telemetry events. + public func subscribeTraceEvents() -> AsyncStream { + makeSubscription(for: .trace) + } + + /// Subscribe to disk pressure + feature flag events. + public func subscribeDiskPressureEvents() -> AsyncStream { + makeSubscription(for: .diskPressure) + } + + /// Subscribe to contacts invalidation events. + public func subscribeContactsEvents() -> AsyncStream { + makeSubscription(for: .contacts) + } + + /// Subscribe to app-file change invalidations. + public func subscribeAppFilesEvents() -> AsyncStream { + makeSubscription(for: .appFiles) + } + + /// Subscribe to settings store events. + public func subscribeSettingsEvents() -> AsyncStream { + makeSubscription(for: .settings) + } + + /// Subscribe to home / feed events. + public func subscribeHomeEvents() -> AsyncStream { + makeSubscription(for: .home) + } + + /// Subscribe to meet status events. + public func subscribeMeetEvents() -> AsyncStream { + makeSubscription(for: .meet) + } + + /// Subscribe to the app-delegate orchestration stream. + public func subscribeAppDelegateEvents() -> AsyncStream { + makeSubscription(for: .appDelegate) + } + // MARK: - SSE State private var sseTask: Task? @@ -225,8 +344,10 @@ public final class EventStreamClient { func teardown() { shouldReconnect = false stopSSE() - for subscriber in subscribers.values { - subscriber.continuation.finish() + for bucket in subscribers.values { + for subscriber in bucket.values { + subscriber.continuation.finish() + } } subscribers.removeAll() } @@ -528,7 +649,6 @@ public final class EventStreamClient { // scheduled conversations — which never emit user_message_echo — // can't pollute the map with their conversationId during the window // between sendUserMessage and the HTTP 202 response. - var broadcastConversationId: String? if let conversationId = extractJsonStringValue(from: jsonString, key: "conversationId") { let eventType = extractJsonStringValue(from: jsonString, key: "type") let localId: String? @@ -545,7 +665,6 @@ public final class EventStreamClient { } else { localId = nil } - broadcastConversationId = localId ?? conversationId if let localId { jsonString = jsonString.replacingOccurrences( of: "\"conversationId\":\"\(conversationId)\"", @@ -618,7 +737,7 @@ public final class EventStreamClient { guard let message else { return } if shouldIgnoreHostToolRequest(message) { return } - handleParsedMessage(message, conversationId: broadcastConversationId) + handleParsedMessage(message) } private func shouldIgnoreHostToolRequest(_ message: ServerMessage) -> Bool { @@ -663,8 +782,8 @@ public final class EventStreamClient { /// Handle a successfully parsed server message: /// 1. Intercept token_rotated (update credentials, reconnect SSE) /// 2. Call pre-processor (DaemonStatus state updates) - /// 3. Broadcast to all subscribers - private func handleParsedMessage(_ message: ServerMessage, conversationId: String? = nil) { + /// 3. Route to typed dispatchers + private func handleParsedMessage(_ message: ServerMessage) { // Intercept token rotation — don't broadcast to subscribers if case .tokenRotated(let msg) = message { log.info("Received token_rotated event — reconnecting SSE") @@ -687,19 +806,144 @@ public final class EventStreamClient { } messagePreProcessor?(message) - broadcastMessage(message, conversationId: conversationId) + broadcastMessage(message) } - /// Broadcast a message to subscribers. When `conversationId` is provided, - /// subscribers with a non-matching conversation filter are skipped. - public func broadcastMessage(_ message: ServerMessage, conversationId: String? = nil) { - for subscriber in subscribers.values { - if let filterConvId = subscriber.filter?.conversationId, - let messageConvId = conversationId, - filterConvId != messageConvId { - continue + /// Dispatch a parsed event to every typed subscriber whose category + /// covers the event's kind. Public so tests and synthetic-event call + /// sites (e.g. `userMessagePersisted`) can fan a message into the + /// routing layer without going through the SSE parser. + public func broadcastMessage(_ message: ServerMessage) { + let categories = Self.categories(for: message) + for category in categories { + guard let bucket = subscribers[category] else { continue } + for subscriber in bucket.values { + subscriber.continuation.yield(message) } - subscriber.continuation.yield(message) + } + } + + /// Static category mapping. Each event type is routed to one or more + /// categories — for example, `appFilesChanged` goes to both `appFiles` + /// (DirectoryStore invalidation) and `chat` (ChatActionHandler's + /// surface-image refresh path). + /// + /// The mapping is deliberately conservative: when a category cares about + /// a class of events (e.g. chat events) we include the full set so a new + /// streaming-related event landing in this file doesn't silently bypass + /// the consumer that needs it. Cross-domain orchestration events (host + /// tools, document editor, recording, etc.) all flow through + /// `appDelegate`. + private static func categories(for message: ServerMessage) -> [EventStreamCategory] { + switch message { + // MARK: Chat streaming + lifecycle + case .messageOpen, .blockOpen, .blockClose, .messageClose, + .assistantTextDelta, .assistantThinkingDelta, .assistantActivityState, + .toolUseStart, .toolUsePreviewStart, .toolInputDelta, .toolOutputChunk, .toolResult, + .messageComplete, .messageQueued, .messageDequeued, .messageRequestComplete, + .messageQueuedDeleted, .messageSteered, .generationCancelled, .generationHandoff, + .userMessageEcho, .userMessagePersisted, .queuedMessageAcked, + .conversationInfo, .conversationError, .confirmationRequest, .confirmationStateChanged, + .undoComplete, .suggestionResponse, .error, + .watchStarted, .watchCompleteRequest, + .subagentSpawned, .subagentStatusChanged, .subagentEvent, + .contextCompacted, .compactionCircuitOpen, .compactionCircuitClosed, + .memoryStatus, .memoryRecalled, + .modelInfo, .guardianActionsPendingResponse, .turnProfileAutoRouted, + .usageProgress: + return [.chat] + + // MARK: UI surfaces — chat (inline) + app delegate (overlay surface manager) + case .uiSurfaceShow, .uiSurfaceUpdate, .uiSurfaceUndoResult: + return [.chat, .appDelegate] + case .uiSurfaceDismiss, .uiSurfaceComplete: + return [.chat, .appDelegate] + case .uiLayoutConfig: + return [.appDelegate] + + // MARK: Conversation list management + case .conversationListResponse, .historyResponse, + .conversationTitleUpdated, .conversationListInvalidated: + return [.conversationList] + + // MARK: Conversation orchestration + case .conversationIdResolved, .conversationInferenceProfileUpdated, + .acpSessionSpawned, .acpSessionUpdate, .acpSessionCompleted, .acpSessionError: + return [.conversationOrchestration] + + // MARK: Trace + usage telemetry + case .traceEvent: + return [.trace] + case .usageUpdate: + // Chat consumes for in-conversation usage attribution; the main + // window's trace category resets the dashboard. + return [.chat, .trace] + + // MARK: Disk pressure + case .diskPressureStatusChanged: + return [.diskPressure] + + // MARK: Feature flag changes — disk pressure (UI-impacting) + + // app delegate (kicks off reload). + case .featureFlagsChanged: + return [.diskPressure, .appDelegate] + + // MARK: Contacts invalidation + case .contactsChanged: + return [.contacts] + + // MARK: Workspace app files + case .appFilesChanged: + // DirectoryStore invalidates its app list; chat's action handler + // refreshes surface preview images for the active conversation; + // app delegate refreshes the in-memory apps cache. + return [.appFiles, .chat, .appDelegate] + + // MARK: Settings store + case .ingressConfigResponse, .telegramConfigResponse: + return [.settings] + + // MARK: Home / feed + case .relationshipStateUpdated, .homeFeedUpdated: + return [.home] + + // MARK: Meet status + case .meetJoining, .meetJoined, .meetLeft, .meetError, + .meetParticipantChanged, .meetSpeakerChanged, .meetTranscriptChunk, + .meetChatSent, .meetSpeakingStarted, .meetSpeakingEnded: + return [.meet] + + // MARK: App-delegate orchestration + case .notificationIntent, .notificationConversationCreated, + .openUrl, .openConversation, .navigateSettings, + .showPlatformLogin, .platformDisconnected, + .taskRunConversationCreated, .scheduleConversationCreated, + .heartbeatConversationCreated, + .documentEditorShow, .documentEditorUpdate, + .documentSaveResponse, .documentLoadResponse, + .recordingStart, .recordingStop, .recordingPause, .recordingResume, + .clientSettingsUpdate, .identityChanged, .avatarUpdated, + .soundsConfigUpdated, .configChanged, + .syncChanged, + .bookmarkCreated, .bookmarkDeleted, + .hostBashRequest, .hostBashCancel, + .hostFileRequest, .hostFileCancel, + .hostCuRequest, .hostCuCancel, + .hostAppControlRequest, .hostAppControlCancel, + .hostBrowserRequest, .hostBrowserCancel, + .hostTransferRequest, .hostTransferCancel, + .signBundlePayload, .getSigningIdentity, + .secretRequest, .contactRequest, + .skillStateChanged, + .serviceGroupUpdateStarting, .serviceGroupUpdateProgress, .serviceGroupUpdateComplete: + return [.appDelegate] + + // MARK: Unrouted — pre-handled by the parsed-message interceptor + // (`tokenRotated`) or pure on-demand request/response payloads that + // are consumed directly by their HTTP/IPC initiators rather than + // through SSE fan-out. + default: + return [] } } @@ -738,8 +982,10 @@ public final class EventStreamClient { tokenRotationTask?.cancel() sseReconnectTask?.cancel() sseTask?.cancel() - for subscriber in subscribers.values { - subscriber.continuation.finish() + for bucket in subscribers.values { + for subscriber in bucket.values { + subscriber.continuation.finish() + } } } } diff --git a/clients/shared/Tests/GatewayConnectionManagerTests.swift b/clients/shared/Tests/GatewayConnectionManagerTests.swift index 99c842be48d..f2a7764f86f 100644 --- a/clients/shared/Tests/GatewayConnectionManagerTests.swift +++ b/clients/shared/Tests/GatewayConnectionManagerTests.swift @@ -40,14 +40,15 @@ final class GatewayConnectionManagerTests: XCTestCase { func testSubscribeReturnsStream() { let client = GatewayConnectionManager() - let stream = client.eventStreamClient.subscribe() - // Simply verify subscribe() returns without crashing; stream is non-nil (value type) + let stream = client.eventStreamClient.subscribeChatEvents() + // Simply verify the typed dispatcher returns without crashing; stream + // is non-nil (value type). _ = stream } func testEmitDeliversToSubscriber() async { let client = GatewayConnectionManager() - let stream = client.eventStreamClient.subscribe() + let stream = client.eventStreamClient.subscribeChatEvents() // Collect one message from the stream let expectation = XCTestExpectation(description: "Subscriber receives emitted message") @@ -81,8 +82,8 @@ final class GatewayConnectionManagerTests: XCTestCase { func testEmitDeliversToMultipleSubscribers() async { let client = GatewayConnectionManager() - let stream1 = client.eventStreamClient.subscribe() - let stream2 = client.eventStreamClient.subscribe() + let stream1 = client.eventStreamClient.subscribeChatEvents() + let stream2 = client.eventStreamClient.subscribeChatEvents() let exp1 = XCTestExpectation(description: "Subscriber 1 receives message") let exp2 = XCTestExpectation(description: "Subscriber 2 receives message")