Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 49 additions & 25 deletions clients/macos/vellum-assistant/App/AppDelegate+Bootstrap.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import AppKit
import Combine
import VellumAssistantShared
import os

Expand Down Expand Up @@ -46,26 +47,41 @@ extension AppDelegate {
}
}

/// Polls connection state at ~0.5s intervals. Does NOT call
/// `connect()` itself — that is the sole responsibility of
/// `setupGatewayConnectionManager()`. This avoids a dual-connect race where two
/// concurrent Tasks both attempt `connectionManager.connect()`, with the
/// second caller's `disconnectInternal()` tearing down the first
/// caller's in-flight HTTP connection.
/// Waits for `connectionManager.isConnected` to become `true`, or until
/// the timeout expires — whichever comes first.
///
/// Does NOT call `connect()` itself — that is the sole responsibility of
/// `setupGatewayConnectionManager()`.
func awaitDaemonReady(timeout: TimeInterval) async -> Bool {
log.info("Waiting for assistant to become ready (timeout: \(timeout)s)")
let start = CFAbsoluteTimeGetCurrent()

while CFAbsoluteTimeGetCurrent() - start < timeout {
if connectionManager.isConnected {
log.info("Assistant is connected")
return true
if connectionManager.isConnected {
log.info("Assistant is connected")
return true
}

let connected = await withTaskGroup(of: Bool.self, returning: Bool.self) { group in
group.addTask { @MainActor [connectionManager = self.connectionManager] in
for await isConnected in connectionManager.isConnectedStream where isConnected {
return true
}
return false
}
group.addTask {
try? await Task.sleep(nanoseconds: UInt64(timeout * 1_000_000_000))
return false
}
try? await Task.sleep(nanoseconds: 500_000_000)
let result = await group.next() ?? false
group.cancelAll()
return result
}

log.warning("Assistant connection timed out after \(timeout)s")
return connectionManager.isConnected
if connected {
log.info("Assistant is connected")
} else {
log.warning("Assistant connection timed out after \(timeout)s")
}
return connected || connectionManager.isConnected
}

/// Waits for the local bootstrap to complete (`.localBootstrapCompleted` notification)
Expand Down Expand Up @@ -198,8 +214,9 @@ extension AppDelegate {
}
}

/// Performs the initial actor token bootstrap with fixed-interval polling.
/// Called only when no actor token exists (first launch or after credential wipe).
/// Performs the initial actor token bootstrap, reactively waiting for a
/// gateway connection before each attempt. Called only when no actor token
/// exists (first launch or after credential wipe).
///
/// Before hitting the network, checks whether the CLI already persisted a
/// guardian token to disk (e.g. during a Docker or cloud hatch). If found,
Expand All @@ -216,15 +233,12 @@ extension AppDelegate {
}

let deviceId = PairingQRCodeSheet.computeHostId()
let delay: UInt64 = 500_000_000
var connectionDelay: UInt64 = 500_000_000
let connectionMaxDelay: UInt64 = 10_000_000_000
let retryDelay: UInt64 = 500_000_000

while !Task.isCancelled {
guard connectionManager.isConnected else {
try? await Task.sleep(nanoseconds: connectionDelay)
connectionDelay = min(connectionDelay * 2, connectionMaxDelay)
continue
if !connectionManager.isConnected {
await awaitConnectionEstablished()
guard !Task.isCancelled else { return }
}

let success = await GuardianClient().bootstrapActorToken(
Expand All @@ -237,8 +251,18 @@ extension AppDelegate {
return
}

let jitter = UInt64.random(in: 0...(delay / 4))
try? await Task.sleep(nanoseconds: delay + jitter)
let jitter = UInt64.random(in: 0...(retryDelay / 4))
try? await Task.sleep(nanoseconds: retryDelay + jitter)
}
}

/// Suspends until `connectionManager.isConnected` becomes `true`,
/// or the task is cancelled.
@MainActor
private func awaitConnectionEstablished() async {
guard !connectionManager.isConnected else { return }
for await isConnected in connectionManager.isConnectedStream where isConnected {
return
}
}
}
19 changes: 19 additions & 0 deletions clients/shared/Network/GatewayConnectionManager.swift
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import Combine
import Foundation
import os

Expand Down Expand Up @@ -688,6 +689,24 @@ public final class GatewayConnectionManager: ObservableObject {
}
#endif

// MARK: - Async Observation

/// An `AsyncStream` that emits whenever `isConnected` changes.
///
/// Prefer this over `$isConnected.values` — Combine's `AsyncPublisher`
/// does not terminate on task cancellation, which can cause
/// `withTaskGroup` to hang indefinitely.
public var isConnectedStream: AsyncStream<Bool> {
AsyncStream { continuation in
let cancellable = self.$isConnected.sink { value in
continuation.yield(value)
}
continuation.onTermination = { _ in
cancellable.cancel()
}
}
}

// MARK: - Helpers

private func setConnected(_ connected: Bool) {
Expand Down