From 540b9935120580646b5599ac0637c4f56ad87447 Mon Sep 17 00:00:00 2001 From: Alex Nork Date: Tue, 10 Feb 2026 23:14:41 -0500 Subject: [PATCH] Add DaemonClient Unix domain socket IPC client (#494) Implements a @MainActor DaemonClient that connects to the Vellum daemon's Unix domain socket (~/.vellum/vellum.sock) using Network.framework's NWConnection. Features: - Newline-delimited JSON framing with buffered receive parsing - AsyncStream for consumers to filter relevant messages - Auto-reconnect with exponential backoff (1s-30s) - Ping/pong keepalive (30s interval, 10s timeout) - Fire-and-forget send with JSONEncoder - VELLUM_DAEMON_SOCKET env var override for socket path Co-Authored-By: Claude Opus 4.6 --- clients/macos/Package.swift | 1 + .../vellum-assistant/IPC/DaemonClient.swift | 382 ++++++++++++++++++ 2 files changed, 383 insertions(+) create mode 100644 clients/macos/vellum-assistant/IPC/DaemonClient.swift diff --git a/clients/macos/Package.swift b/clients/macos/Package.swift index b180249b922..8d2799122a3 100644 --- a/clients/macos/Package.swift +++ b/clients/macos/Package.swift @@ -34,6 +34,7 @@ let package = Package( .linkedFramework("Security"), .linkedFramework("Speech"), .linkedFramework("Vision"), + .linkedFramework("Network"), ] ), .testTarget( diff --git a/clients/macos/vellum-assistant/IPC/DaemonClient.swift b/clients/macos/vellum-assistant/IPC/DaemonClient.swift new file mode 100644 index 00000000000..fa8ccac9fbb --- /dev/null +++ b/clients/macos/vellum-assistant/IPC/DaemonClient.swift @@ -0,0 +1,382 @@ +import Foundation +import Network +import os + +private let log = Logger(subsystem: Bundle.main.bundleIdentifier ?? "com.vellum.vellum-assistant", category: "DaemonClient") + +/// Unix domain socket client for communicating with the Vellum daemon. +/// +/// Connects to the daemon's socket at `~/.vellum/vellum.sock` (or `VELLUM_DAEMON_SOCKET` env override), +/// sends and receives newline-delimited JSON messages. +/// +/// This is a long-lived singleton. The `messages` stream stays open for the app's lifetime. +/// Consumers (ComputerUseSession, AmbientAgent) filter for messages relevant to them. +@MainActor +final class DaemonClient: ObservableObject { + + // MARK: - Published State + + @Published var isConnected: Bool = false + + // MARK: - Message Stream + + /// Stream of incoming server messages. Stays open for the lifetime of the client. + var messages: AsyncStream { + stream + } + + // MARK: - Private State + + private var connection: NWConnection? + private let queue = DispatchQueue(label: "com.vellum.vellum-assistant.daemon-client", qos: .userInitiated) + + private let stream: AsyncStream + private let continuation: AsyncStream.Continuation + + /// Buffer for accumulating incoming data until we have complete newline-delimited messages. + private var receiveBuffer = Data() + + /// Maximum line size: 96 MB (for screenshots with base64). + private let maxLineSize = 96 * 1024 * 1024 + + /// Whether we should attempt to reconnect on disconnect. + private var shouldReconnect = true + + /// Current reconnect backoff delay in seconds. + private var reconnectDelay: TimeInterval = 1.0 + + /// Maximum reconnect backoff delay. + private let maxReconnectDelay: TimeInterval = 30.0 + + /// Reconnect task handle. + private var reconnectTask: Task? + + /// Ping timer task handle. + private var pingTask: Task? + + /// Whether we're waiting for a pong response. + private var awaitingPong = false + + /// Pong timeout task handle. + private var pongTimeoutTask: Task? + + private let encoder = JSONEncoder() + private let decoder = JSONDecoder() + + // MARK: - Init + + init() { + let (stream, continuation) = AsyncStream.makeStream() + self.stream = stream + self.continuation = continuation + } + + deinit { + // Cancel everything without triggering reconnect. + shouldReconnect = false + reconnectTask?.cancel() + pingTask?.cancel() + pongTimeoutTask?.cancel() + connection?.cancel() + continuation.finish() + } + + // MARK: - Socket Path + + /// Resolves the daemon socket path: + /// 1. `VELLUM_DAEMON_SOCKET` environment variable + /// 2. `~/.vellum/vellum.sock` + private static func resolveSocketPath() -> String { + if let envPath = ProcessInfo.processInfo.environment["VELLUM_DAEMON_SOCKET"], !envPath.isEmpty { + return envPath + } + return NSHomeDirectory() + "/.vellum/vellum.sock" + } + + // MARK: - Connect + + /// Connect to the daemon socket. If already connected, disconnects first. + func connect() async throws { + // Disconnect any existing connection without triggering reconnect. + disconnectInternal(triggerReconnect: false) + + shouldReconnect = true + reconnectDelay = 1.0 + + let socketPath = Self.resolveSocketPath() + log.info("Connecting to daemon socket at \(socketPath)") + + let endpoint = NWEndpoint.unix(path: socketPath) + let parameters = NWParameters() + parameters.defaultProtocolStack.transportProtocol = NWProtocolTCP.Options() + + let conn = NWConnection(to: endpoint, using: parameters) + self.connection = conn + + try await withCheckedThrowingContinuation { (checkedContinuation: CheckedContinuation) in + var resumed = false + + conn.stateUpdateHandler = { [weak self] state in + guard let self else { return } + + Task { @MainActor in + switch state { + case .ready: + if !resumed { + resumed = true + log.info("Connected to daemon socket") + self.isConnected = true + self.reconnectDelay = 1.0 + self.startReceiveLoop() + self.startPingTimer() + checkedContinuation.resume() + } + + case .failed(let error): + log.error("Connection failed: \(error.localizedDescription)") + self.isConnected = false + self.stopPingTimer() + if !resumed { + resumed = true + checkedContinuation.resume(throwing: error) + } else { + self.scheduleReconnect() + } + + case .cancelled: + log.info("Connection cancelled") + self.isConnected = false + self.stopPingTimer() + if !resumed { + resumed = true + checkedContinuation.resume(throwing: NWError.posix(.ECANCELED)) + } + + case .waiting(let error): + log.warning("Connection waiting: \(error.localizedDescription)") + // Don't resume the continuation yet; NWConnection may still transition to .ready. + + default: + break + } + } + } + + conn.start(queue: self.queue) + } + } + + // MARK: - Send + + /// Send a message to the daemon. Fire-and-forget. + /// Encodes the message as JSON, appends a newline, and writes to the connection. + func send(_ message: T) throws { + guard let conn = connection else { + log.warning("Cannot send: not connected") + return + } + + var data = try encoder.encode(message) + data.append(contentsOf: [0x0A]) // newline byte + + conn.send(content: data, completion: .contentProcessed { error in + if let error { + log.error("Send failed: \(error.localizedDescription)") + } + }) + } + + // MARK: - Disconnect + + /// Disconnect from the daemon. Stops reconnect and ping timers. + func disconnect() { + disconnectInternal(triggerReconnect: false) + } + + private func disconnectInternal(triggerReconnect: Bool) { + shouldReconnect = triggerReconnect + reconnectTask?.cancel() + reconnectTask = nil + stopPingTimer() + + if let conn = connection { + conn.stateUpdateHandler = nil + conn.cancel() + connection = nil + } + + receiveBuffer = Data() + isConnected = false + } + + // MARK: - Receive Loop + + private func startReceiveLoop() { + guard let conn = connection else { return } + receiveData(on: conn) + } + + private func receiveData(on conn: NWConnection) { + conn.receive(minimumIncompleteLength: 1, maximumLength: 65536) { [weak self] content, _, isComplete, error in + guard let self else { return } + + Task { @MainActor in + if let data = content, !data.isEmpty { + self.processReceivedData(data) + } + + if isComplete { + log.info("Connection received EOF") + self.handleUnexpectedDisconnect() + return + } + + if let error { + log.error("Receive error: \(error.localizedDescription)") + self.handleUnexpectedDisconnect() + return + } + + // Continue reading. + self.receiveData(on: conn) + } + } + } + + /// Buffer incoming data, split on newlines, decode each complete line as ServerMessage. + private func processReceivedData(_ data: Data) { + receiveBuffer.append(data) + + // Check max buffer size. + if receiveBuffer.count > maxLineSize { + log.error("Receive buffer exceeded max line size (\(self.maxLineSize) bytes), clearing buffer") + receiveBuffer = Data() + return + } + + // Split on newlines. + let newline = UInt8(0x0A) + while let newlineIndex = receiveBuffer.firstIndex(of: newline) { + let lineData = receiveBuffer[receiveBuffer.startIndex.." + let prefix = lineString.count > 200 ? String(lineString.prefix(200)) + "..." : lineString + log.error("Failed to decode server message: \(error.localizedDescription), line: \(prefix)") + } + } + } + + private func handleServerMessage(_ message: ServerMessage) { + // Handle pong internally. + if case .pong = message { + awaitingPong = false + pongTimeoutTask?.cancel() + pongTimeoutTask = nil + } + + // Yield all messages (including pong) to stream consumers. + continuation.yield(message) + } + + // MARK: - Reconnect + + private func handleUnexpectedDisconnect() { + disconnectInternal(triggerReconnect: shouldReconnect) + if shouldReconnect { + // Re-enable reconnect since disconnectInternal sets it based on the parameter. + self.shouldReconnect = true + scheduleReconnect() + } + } + + private func scheduleReconnect() { + guard shouldReconnect else { return } + reconnectTask?.cancel() + + let delay = reconnectDelay + log.info("Scheduling reconnect in \(delay)s") + + reconnectTask = Task { @MainActor [weak self] in + do { + try await Task.sleep(nanoseconds: UInt64(delay * 1_000_000_000)) + } catch { + return // Cancelled. + } + + guard let self, self.shouldReconnect else { return } + + // Increase backoff for next attempt. + self.reconnectDelay = min(self.reconnectDelay * 2, self.maxReconnectDelay) + + do { + try await self.connect() + } catch { + log.error("Reconnect failed: \(error.localizedDescription)") + // connect() failure will trigger another scheduleReconnect via stateUpdateHandler + // only if we haven't already scheduled one. + if self.shouldReconnect && self.reconnectTask == nil { + self.scheduleReconnect() + } + } + } + } + + // MARK: - Ping / Pong + + private func startPingTimer() { + stopPingTimer() + + pingTask = Task { @MainActor [weak self] in + while !Task.isCancelled { + do { + try await Task.sleep(nanoseconds: 30_000_000_000) // 30 seconds + } catch { + return // Cancelled. + } + + guard let self, self.isConnected else { return } + + self.sendPing() + } + } + } + + private func stopPingTimer() { + pingTask?.cancel() + pingTask = nil + pongTimeoutTask?.cancel() + pongTimeoutTask = nil + awaitingPong = false + } + + private func sendPing() { + do { + try send(PingMessage()) + awaitingPong = true + + // Start pong timeout. + pongTimeoutTask?.cancel() + pongTimeoutTask = Task { @MainActor [weak self] in + do { + try await Task.sleep(nanoseconds: 10_000_000_000) // 10 seconds + } catch { + return // Cancelled. + } + + guard let self, self.awaitingPong else { return } + log.warning("Pong timeout, reconnecting") + self.handleUnexpectedDisconnect() + } + } catch { + log.error("Failed to send ping: \(error.localizedDescription)") + } + } +}