Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ extension AppDelegate {
eventSubscriptionTask?.cancel()
eventSubscriptionTask = Task { @MainActor [weak self] in
guard let self else { return }
let stream = self.eventStreamClient.subscribe()
let stream = self.eventStreamClient.subscribeAppDelegateEvents()
for await message in stream {
guard !Task.isCancelled else { break }
switch message {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ final class DiskPressureStatusStore {
guard eventTask == nil, let eventStreamClient else { return }
eventTask = Task { @MainActor [weak self] in
guard let self else { return }
for await message in eventStreamClient.subscribe() {
for await message in eventStreamClient.subscribeDiskPressureEvents() {
guard !Task.isCancelled else { break }
self.handle(message)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ enum HostCuExecutor {
on client: GatewayConnectionManager,
overlayProvider: @escaping @MainActor (_ conversationId: String, _ request: HostCuRequest) -> HostCuSessionProxy? = { _, _ in nil }
) {
// No-op: host CU requests are handled via EventStreamClient.subscribe() in AppDelegate.
// No-op: host CU requests are handled via EventStreamClient.subscribeAppDelegateEvents() in AppDelegate.
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ final class ConversationManager: ConversationRestorerDelegate {
}
Task { @MainActor [weak self] in
guard let self else { return }
for await message in self.eventStreamClient.subscribe() {
for await message in self.eventStreamClient.subscribeConversationOrchestrationEvents() {
switch message {
case .conversationIdResolved(let localId, let serverId):
self.resolveConversationId(from: localId, to: serverId)
Expand Down Expand Up @@ -744,7 +744,9 @@ final class ConversationManager: ConversationRestorerDelegate {
if markHistoryLoaded {
viewModel.isHistoryLoaded = true
}
viewModel.startMessageLoop()
// No-op when the VM was already initialized — chat-event subscription
// is started at VM init and runs for the VM lifetime.
viewModel.startChatEventSubscription()

listStore.conversations.insert(conversation, at: 0)
selectionStore.chatViewModels[conversation.id] = viewModel
Expand Down Expand Up @@ -1453,7 +1455,7 @@ final class ConversationManager: ConversationRestorerDelegate {
if let viewModel = selectionStore.chatViewModels[existingConversation.id] {
viewModel.conversationId = item.id
viewModel.isChannelConversation = updatedConversation.isChannelConversation
viewModel.ensureMessageLoopStarted()
viewModel.startChatEventSubscription()
}
return existingConversation.id
}
Expand All @@ -1462,7 +1464,7 @@ final class ConversationManager: ConversationRestorerDelegate {
let viewModel = makeViewModel()
viewModel.conversationId = item.id
viewModel.isChannelConversation = conversationModel.isChannelConversation
viewModel.startMessageLoop()
viewModel.startChatEventSubscription()

listStore.conversations.insert(conversationModel, at: 0)
selectionStore.chatViewModels[conversationModel.id] = viewModel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ final class ConversationRestorer {
func startObserving(skipInitialFetch: Bool = false) {
Task { @MainActor [weak self] in
guard let self else { return }
for await message in self.eventStreamClient.subscribe() {
for await message in self.eventStreamClient.subscribeConversationListEvents() {
switch message {
case .conversationListResponse(let response):
// SSE-pushed responses don't have the foreground/background
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ final class ConversationSelectionStore {
activeConversation = listStore.conversationsByLocalId[conversationId]

let vm = getOrCreateViewModel(for: conversationId)
vm?.ensureMessageLoopStarted()
vm?.startChatEventSubscription()
onActiveConversationChanged?(conversationId)

// Notify the daemon so it rebinds the socket to this conversation.
Expand Down Expand Up @@ -391,7 +391,7 @@ final class ConversationSelectionStore {
/// Returns an existing or newly-created ViewModel for a detached pop-out window.
func viewModelForDetachedWindow(conversationLocalId: UUID) -> ChatViewModel? {
let vm = getOrCreateViewModel(for: conversationLocalId)
vm?.ensureMessageLoopStarted()
vm?.startChatEventSubscription()
return vm
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ public final class MainWindow {
documentManager.connectionManager = connectionManager
Task { @MainActor [weak self] in
guard let self else { return }
for await message in self.eventStreamClient.subscribe() {
for await message in self.eventStreamClient.subscribeTraceEvents() {
switch message {
case .traceEvent(let msg):
self.traceStore.ingest(msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ struct MainWindowView: View {
// without an app relaunch.
let homeStoreInstance = HomeStore(
client: DefaultHomeStateClient(),
messageStream: eventStreamClient.subscribe()
messageStream: eventStreamClient.subscribeHomeEvents()
)
self._homeStore = State(initialValue: homeStoreInstance)
// Same eager-construction rationale for the activity feed store
Expand All @@ -173,17 +173,17 @@ struct MainWindowView: View {
// already looking at the feed.
self._feedStore = State(initialValue: HomeFeedStore(
client: DefaultHomeFeedClient(),
messageStream: eventStreamClient.subscribe(),
messageStream: eventStreamClient.subscribeHomeEvents(),
onSSEUpdate: { [weak homeStoreInstance] in
guard let homeStoreInstance else { return }
if !homeStoreInstance.isHomeTabVisible {
homeStoreInstance.flagUnseenChanges()
}
}
))
// Meet status panel subscribes to the same shared SSE stream.
// Meet status panel subscribes to the meet-domain dispatcher.
self._meetStatusViewModel = State(initialValue: MeetStatusViewModel(
messageStream: eventStreamClient.subscribe()
messageStream: eventStreamClient.subscribeMeetEvents()
))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ final class ThreadWindowManager {
)

threadWindows[conversationLocalId] = threadWindow
viewModel.ensureMessageLoopStarted()
viewModel.startChatEventSubscription()

log.info("Opened thread window for \(conversationLocalId), \(self.threadWindows.count) total")
return true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ public final class SettingsStore: ObservableObject {
// Subscribe to SSE-pushed config updates
Task { @MainActor [weak self] in
guard let self, let eventStreamClient = self.eventStreamClient else { return }
for await message in eventStreamClient.subscribe() {
for await message in eventStreamClient.subscribeSettingsEvents() {
switch message {
case .ingressConfigResponse(let response):
self.handleIngressConfigResponse(response)
Expand Down
89 changes: 33 additions & 56 deletions clients/shared/Features/Chat/ChatViewModel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,6 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate {

public var conversationId: String? {
didSet {
broadcastFilter.conversationId = conversationId
// If the daemon reconnected before this VM had a conversation ID, a deferred
// flush was requested. Now that we have a conversation, run it.
if conversationId != nil && needsOfflineFlush {
Expand Down Expand Up @@ -950,13 +949,12 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate {
/// (conversation_create sent, awaiting conversation_info). Used by ConversationManager
/// to decide whether it's safe to release the VM on archive.
public var isBootstrapping: Bool { bootstrapCorrelationId != nil }
@ObservationIgnored var messageLoopTask: Task<Void, Never>?
/// Monotonically increasing ID used to distinguish successive message-loop
/// tasks so that a cancelled loop's cleanup doesn't clear a newer replacement.
@ObservationIgnored private var messageLoopGeneration: UInt64 = 0
/// Mutable filter shared with EventStreamClient so conversation-scoped SSE
/// messages are only delivered to the matching subscriber.
@ObservationIgnored private let broadcastFilter = EventStreamClient.ConversationFilter()
/// Single chat-event subscription that lives for the lifetime of this VM.
/// Started in `init` via `subscribeChatEvents()` and torn down in `deinit`.
/// Replaces the legacy `startMessageLoop` / `messageLoopGeneration`
/// machinery, which restarted the subscription per turn and produced a
/// double-subscriber window during the cancel-and-resubscribe gap.
@ObservationIgnored var chatEventSubscriptionTask: Task<Void, Never>?
@ObservationIgnored var currentAssistantMessageId: UUID?
/// The trimmed user text that initiated the current assistant turn.
/// Used to tag the assistant message (e.g. modelList for "/models") without
Expand Down Expand Up @@ -1417,6 +1415,13 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate {
// Initialize the action handler for server message dispatch.
self.actionHandler = ChatActionHandler(viewModel: self)

// Subscribe to the typed chat-event dispatcher for the lifetime of
// this VM. The subscription does not restart per turn (which is what
// the legacy `startMessageLoop` did), so there is no window in which
// two overlapping subscribers can both apply the same event — one of
// the three root causes of the duplication bug this plan addresses.
startChatEventSubscription()

// Start consuming streaming events into the new MessageStore. As of
// PR 4, the chat list renders from `renderedMessages` (which merges
// the legacy `messages` array with snapshots from this store via
Expand Down Expand Up @@ -1625,55 +1630,27 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate {
sendCoordinator.flushOfflineQueue()
}

public func startMessageLoop() {
messageLoopTask?.cancel()
let messageStream = eventStreamClient.subscribe(filter: broadcastFilter)

messageLoopGeneration &+= 1
let generation = messageLoopGeneration

messageLoopTask = Task { @MainActor [weak self] in
for await message in messageStream {
/// Start the chat-event subscription for the lifetime of this view model.
///
/// Safe to call multiple times — no-ops once a subscription is already
/// active. The stream is owned by `EventStreamClient` and lives for as
/// long as the SSE connection (or this VM) does; we no longer tear it
/// down between turns. Mid-turn connection drops are recovered through
/// the `daemonDidReconnect` / `eventStreamDidReconnect` notifications,
/// which run `handleTransportReconnect()` — see that method for the
/// spinner/cursor/history-catch-up logic that used to live inline in
/// the per-turn loop restart.
public func startChatEventSubscription() {
guard chatEventSubscriptionTask == nil else { return }
let stream = eventStreamClient.subscribeChatEvents()
chatEventSubscriptionTask = Task { @MainActor [weak self] in
for await message in stream {
guard let self, !Task.isCancelled else { break }
self.handleServerMessage(message)
}
// Stream ended (e.g. daemon disconnected) — clear the task reference
// so the next sendUserMessage() call will re-subscribe.
// Only nil out if this task is still the current one; a cancelled
// loop that finishes after its replacement must not wipe the new
// task reference, which would cause duplicate subscriptions.
if self?.messageLoopGeneration == generation {
self?.messageLoopTask = nil
// Reset spinner state — if the connection drops mid-turn the client
// never receives message_complete, leaving the UI stuck.
self?.isThinking = false
self?.isSending = false
self?.isCancelling = false
// Stream dropped mid-turn — `message_complete` won't arrive,
// so clear pending turns to avoid bumping
// `interactiveTurnCompletionTick` on the next turn.
self?.messageManager.pendingUserTurnCount = 0
self?.messageManager.staleCancelEventsExpected = 0
if let existingId = self?.currentAssistantMessageId {
self?.messages.finalizeStreamingMessage(id: existingId, completeToolCalls: .none)
}
self?.clearCurrentTurnTracking()
self?.discardStreamingBuffer()
self?.discardPartialOutputBuffer()
// If a send-direct was pending when the stream dropped,
// dispatch it now so the message isn't silently lost.
self?.dispatchPendingSendDirect()
}
}
}

/// Start the daemon message stream if this chat has a bound conversation and
/// no active loop yet.
public func ensureMessageLoopStarted() {
guard conversationId != nil, messageLoopTask == nil else { return }
startMessageLoop()
}

/// Send a message to the daemon without showing a user bubble in the chat.
/// Used for automated actions like inline model picker selections.
/// Returns `true` if the message was sent (or a conversation bootstrap was started),
Expand Down Expand Up @@ -1777,10 +1754,10 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate {
suggestion = nil
pendingSuggestionRequestId = nil

// Make sure we're listening for the response
if messageLoopTask == nil {
startMessageLoop()
}
// Make sure we're listening for the response. Idempotent — the
// chat-event subscription is started once at init and lives for
// the lifetime of the VM.
startChatEventSubscription()

Task {
let success = await regenerateClient.regenerate(conversationId: conversationId)
Expand Down Expand Up @@ -2675,7 +2652,7 @@ public final class ChatViewModel: MessageSendCoordinatorDelegate {
// Cancel all Combine subscriptions first so no new work can be scheduled
// from incoming publisher events while the remaining cleanup runs.
cancellables.removeAll()
messageLoopTask?.cancel()
chatEventSubscriptionTask?.cancel()
streamingFlushTask?.cancel()
partialOutputFlushTask?.cancel()
cancelTimeoutTask?.cancel()
Expand Down
12 changes: 4 additions & 8 deletions clients/shared/Features/Chat/MessageSendCoordinator.swift
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ protocol MessageSendCoordinatorDelegate: AnyObject {

// MARK: - Actions
func flushCoalescedPublish()
func startMessageLoop()
func refreshGuardianPrompts()
func discardStreamingBuffer()
func discardPartialOutputBuffer()
Expand All @@ -74,7 +73,6 @@ protocol MessageSendCoordinatorDelegate: AnyObject {
var onConversationCreated: ((String) -> Void)? { get }
var onFirstUserMessage: ((String) -> Void)? { get set }
var onUserMessageSent: (() -> Void)? { get }
var messageLoopTask: Task<Void, Never>? { get }
}

/// Side-effect coordinator that owns the message send/cancel/queue logic.
Expand Down Expand Up @@ -380,8 +378,8 @@ final class MessageSendCoordinator {
}
}

// Subscribe to daemon stream
delegate.startMessageLoop()
// The chat-event subscription was started at VM init and lives
// for the lifetime of this view model — no per-send restart.

// Generate conversation ID locally — conversation creation is implicit
// for HTTP transport. The conversationKey acts as the conversation.
Expand Down Expand Up @@ -516,10 +514,8 @@ final class MessageSendCoordinator {
messageManager.pendingUserTurnCount += 1
}

// Make sure we're listening
if delegate.messageLoopTask == nil {
delegate.startMessageLoop()
}
// The chat-event subscription is started once at VM init and lives
// for the VM lifetime — no per-send (re)subscribe.

// Consume pending onboarding context on the first send so it's
// included in the POST body. Nil it out immediately so subsequent
Expand Down
8 changes: 4 additions & 4 deletions clients/shared/Features/Chat/MessageStreamReducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ public final class MessageStreamReducer {
subscriptionTask?.cancel()
}

/// Subscribe to the `EventStreamClient` and feed every message into
/// `apply(event:)`. Safe to call multiple times — re-subscribes after
/// cancelling the previous task.
/// Subscribe to the chat event dispatcher and feed every relevant
/// message into `apply(event:)`. Safe to call multiple times —
/// re-subscribes after cancelling the previous task.
public func start() {
subscriptionTask?.cancel()
subscriptionTask = Task { [weak self] in
guard let self else { return }
let stream = self.eventStreamClient.subscribe()
let stream = self.eventStreamClient.subscribeChatEvents()
for await message in stream {
if Task.isCancelled { return }
self.apply(event: message)
Expand Down
2 changes: 1 addition & 1 deletion clients/shared/Features/Contacts/ContactsStore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public final class ContactsStore {
subscriptionTask?.cancel()
subscriptionTask = Task { [weak self] in
guard let self else { return }
let stream = self.eventStreamClient.subscribe()
let stream = self.eventStreamClient.subscribeContactsEvents()

for await message in stream {
guard !Task.isCancelled else { return }
Expand Down
2 changes: 1 addition & 1 deletion clients/shared/Features/Directory/DirectoryStore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public final class DirectoryStore: ObservableObject {
appFilesChangedTask?.cancel()
appFilesChangedTask = Task { [weak self] in
guard let eventStreamClient = self?.eventStreamClient else { return }
let stream = eventStreamClient.subscribe()
let stream = eventStreamClient.subscribeAppFilesEvents()

for await message in stream {
guard let self, !Task.isCancelled else { return }
Expand Down
Loading