Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions clients/macos/Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ let package = Package(
.linkedFramework("Security"),
.linkedFramework("Speech"),
.linkedFramework("Vision"),
.linkedFramework("Network"),
]
),
.testTarget(
Expand Down
382 changes: 382 additions & 0 deletions clients/macos/vellum-assistant/IPC/DaemonClient.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,382 @@
import Foundation
import Network
import os

private let log = Logger(subsystem: Bundle.main.bundleIdentifier ?? "com.vellum.vellum-assistant", category: "DaemonClient")

/// Unix domain socket client for communicating with the Vellum daemon.
///
/// Connects to the daemon's socket at `~/.vellum/vellum.sock` (or `VELLUM_DAEMON_SOCKET` env override),
/// sends and receives newline-delimited JSON messages.
///
/// This is a long-lived singleton. The `messages` stream stays open for the app's lifetime.
/// Consumers (ComputerUseSession, AmbientAgent) filter for messages relevant to them.
@MainActor
final class DaemonClient: ObservableObject {

// MARK: - Published State

@Published var isConnected: Bool = false

// MARK: - Message Stream

/// Stream of incoming server messages. Stays open for the lifetime of the client.
var messages: AsyncStream<ServerMessage> {
stream
}
Comment on lines +24 to +26
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Single-consumer AsyncStream used as multi-consumer broadcast channel

The messages property (line 24-26) exposes a single AsyncStream<ServerMessage>, but the class documentation (line 13) states: "Consumers (ComputerUseSession, AmbientAgent) filter for messages relevant to them" — implying multiple consumers will iterate the same stream.

Why this is a problem

AsyncStream is a unicast stream: each yielded element is delivered to exactly one consumer. If two consumers (e.g., ComputerUseSession and AmbientAgent) both call for await message in daemonClient.messages, each message will be delivered to only one of them non-deterministically, rather than being broadcast to both.

For example, a cu_action message intended for ComputerUseSession might be consumed by AmbientAgent's iterator instead, and vice versa for ambient_result messages.

Additionally, each access to messages returns the same stream instance. AsyncStream can only be iterated once — a second for await on the same stream instance will immediately complete with no elements.

Impact: When the intended multiple consumers are connected, messages will be lost or misrouted. The second consumer to start iterating will receive no messages at all.

Prompt for agents
Replace the single AsyncStream with a broadcast mechanism that supports multiple consumers. Options include:

1. Use a subscriber registration pattern: maintain an array of AsyncStream.Continuation instances, one per consumer. Add a method like `subscribe() -> AsyncStream<ServerMessage>` that creates a new stream/continuation pair, stores the continuation, and returns the stream. In `handleServerMessage`, yield to all stored continuations. Clean up continuations when they are terminated.

2. Alternatively, use Combine's PassthroughSubject (since this is a macOS app) and expose a publisher instead of an AsyncStream, allowing multiple subscribers.

The key change is in `handleServerMessage` at line 286 (`continuation.yield(message)`) which needs to yield to ALL registered continuations, and the `messages` property (lines 24-26) which should be replaced with a `subscribe()` method that creates a fresh stream per consumer.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


// MARK: - Private State

private var connection: NWConnection?
private let queue = DispatchQueue(label: "com.vellum.vellum-assistant.daemon-client", qos: .userInitiated)

private let stream: AsyncStream<ServerMessage>
private let continuation: AsyncStream<ServerMessage>.Continuation

/// Buffer for accumulating incoming data until we have complete newline-delimited messages.
private var receiveBuffer = Data()

/// Maximum line size: 96 MB (for screenshots with base64).
private let maxLineSize = 96 * 1024 * 1024

/// Whether we should attempt to reconnect on disconnect.
private var shouldReconnect = true

/// Current reconnect backoff delay in seconds.
private var reconnectDelay: TimeInterval = 1.0

/// Maximum reconnect backoff delay.
private let maxReconnectDelay: TimeInterval = 30.0

/// Reconnect task handle.
private var reconnectTask: Task<Void, Never>?

/// Ping timer task handle.
private var pingTask: Task<Void, Never>?

/// Whether we're waiting for a pong response.
private var awaitingPong = false

/// Pong timeout task handle.
private var pongTimeoutTask: Task<Void, Never>?

private let encoder = JSONEncoder()
private let decoder = JSONDecoder()

// MARK: - Init

init() {
let (stream, continuation) = AsyncStream<ServerMessage>.makeStream()
self.stream = stream
self.continuation = continuation
Comment on lines +68 to +71
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Replace shared AsyncStream with a broadcast channel

This client exposes one shared AsyncStream instance to all subscribers, but AsyncStream is not multicast: concurrent iterators compete for elements, so messages will be split across consumers instead of each consumer seeing all messages. In the documented scenario where both ComputerUseSession and AmbientAgent listen and filter independently, one can consume and effectively hide messages meant for the other, causing dropped actions/results.

Useful? React with 👍 / 👎.

}

deinit {
// Cancel everything without triggering reconnect.
shouldReconnect = false
reconnectTask?.cancel()
pingTask?.cancel()
pongTimeoutTask?.cancel()
connection?.cancel()
continuation.finish()
}

// MARK: - Socket Path

/// Resolves the daemon socket path:
/// 1. `VELLUM_DAEMON_SOCKET` environment variable
/// 2. `~/.vellum/vellum.sock`
private static func resolveSocketPath() -> String {
if let envPath = ProcessInfo.processInfo.environment["VELLUM_DAEMON_SOCKET"], !envPath.isEmpty {
return envPath
}
return NSHomeDirectory() + "/.vellum/vellum.sock"
}

// MARK: - Connect

/// Connect to the daemon socket. If already connected, disconnects first.
func connect() async throws {
// Disconnect any existing connection without triggering reconnect.
disconnectInternal(triggerReconnect: false)

shouldReconnect = true
reconnectDelay = 1.0
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Exponential backoff is reset on every reconnect attempt, defeating the backoff mechanism

The connect() method unconditionally resets reconnectDelay = 1.0 at line 104 before attempting the connection. Since scheduleReconnect() calls connect() (line 320) after doubling the delay (line 317), the doubled value is immediately overwritten back to 1.0. This means every reconnect attempt uses a 1-second delay, and the exponential backoff never actually increases.

Detailed trace of the broken backoff
  1. Initial connect()reconnectDelay = 1.0 (line 104)
  2. Connection fails → stateUpdateHandler .failedscheduleReconnect()
  3. scheduleReconnect() captures delay = 1.0 (line 304), sleeps 1s, then sets reconnectDelay = min(1.0 * 2, 30) = 2.0 (line 317), then calls self.connect() (line 320)
  4. connect() immediately sets reconnectDelay = 1.0 (line 104) — overwriting the 2.0
  5. Connection fails again → scheduleReconnect() captures delay = 1.0 again

The backoff never progresses beyond 1 second. If the daemon is down for an extended period, the client will hammer the socket path every ~1 second indefinitely instead of backing off to 30 seconds.

Impact: When the daemon is unavailable, the client creates excessive connection attempts (one per second) instead of backing off exponentially, wasting CPU and potentially flooding logs.

Fix: Remove reconnectDelay = 1.0 from line 104. The reset on successful connection at line 129 (self.reconnectDelay = 1.0 inside the .ready handler) is sufficient and correct.

Suggested change
reconnectDelay = 1.0
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +103 to +104
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve reconnect backoff across retry attempts

connect() unconditionally resets reconnectDelay to 1.0, including when called from scheduleReconnect(), so the retry delay never actually ramps up beyond 1 second. During daemon outages this causes tight reconnect hammering rather than the intended exponential backoff (1s→30s), which can increase load and log churn and makes the backoff logic ineffective.

Useful? React with 👍 / 👎.


let socketPath = Self.resolveSocketPath()
log.info("Connecting to daemon socket at \(socketPath)")

let endpoint = NWEndpoint.unix(path: socketPath)
let parameters = NWParameters()
parameters.defaultProtocolStack.transportProtocol = NWProtocolTCP.Options()

let conn = NWConnection(to: endpoint, using: parameters)
self.connection = conn

try await withCheckedThrowingContinuation { (checkedContinuation: CheckedContinuation<Void, Error>) in
var resumed = false

conn.stateUpdateHandler = { [weak self] state in
guard let self else { return }

Task { @MainActor in
switch state {
case .ready:
if !resumed {
resumed = true
log.info("Connected to daemon socket")
self.isConnected = true
self.reconnectDelay = 1.0
self.startReceiveLoop()
self.startPingTimer()
checkedContinuation.resume()
}

case .failed(let error):
log.error("Connection failed: \(error.localizedDescription)")
self.isConnected = false
self.stopPingTimer()
if !resumed {
resumed = true
checkedContinuation.resume(throwing: error)
} else {
self.scheduleReconnect()
}

case .cancelled:
log.info("Connection cancelled")
self.isConnected = false
self.stopPingTimer()
if !resumed {
resumed = true
checkedContinuation.resume(throwing: NWError.posix(.ECANCELED))
}

case .waiting(let error):
log.warning("Connection waiting: \(error.localizedDescription)")
// Don't resume the continuation yet; NWConnection may still transition to .ready.

default:
break
}
}
}

conn.start(queue: self.queue)
}
}

// MARK: - Send

/// Send a message to the daemon. Fire-and-forget.
/// Encodes the message as JSON, appends a newline, and writes to the connection.
func send<T: Encodable>(_ message: T) throws {
guard let conn = connection else {
log.warning("Cannot send: not connected")
return
}

var data = try encoder.encode(message)
data.append(contentsOf: [0x0A]) // newline byte

conn.send(content: data, completion: .contentProcessed { error in
if let error {
log.error("Send failed: \(error.localizedDescription)")
}
})
}

// MARK: - Disconnect

/// Disconnect from the daemon. Stops reconnect and ping timers.
func disconnect() {
disconnectInternal(triggerReconnect: false)
}

private func disconnectInternal(triggerReconnect: Bool) {
shouldReconnect = triggerReconnect
reconnectTask?.cancel()
reconnectTask = nil
stopPingTimer()

if let conn = connection {
conn.stateUpdateHandler = nil
conn.cancel()
connection = nil
}

receiveBuffer = Data()
isConnected = false
}

// MARK: - Receive Loop

private func startReceiveLoop() {
guard let conn = connection else { return }
receiveData(on: conn)
}

private func receiveData(on conn: NWConnection) {
conn.receive(minimumIncompleteLength: 1, maximumLength: 65536) { [weak self] content, _, isComplete, error in
guard let self else { return }

Task { @MainActor in
if let data = content, !data.isEmpty {
self.processReceivedData(data)
}

if isComplete {
log.info("Connection received EOF")
self.handleUnexpectedDisconnect()
return
}

if let error {
log.error("Receive error: \(error.localizedDescription)")
self.handleUnexpectedDisconnect()
return
}

// Continue reading.
self.receiveData(on: conn)
}
}
}

/// Buffer incoming data, split on newlines, decode each complete line as ServerMessage.
private func processReceivedData(_ data: Data) {
receiveBuffer.append(data)

// Check max buffer size.
if receiveBuffer.count > maxLineSize {
log.error("Receive buffer exceeded max line size (\(self.maxLineSize) bytes), clearing buffer")
receiveBuffer = Data()
return
}

// Split on newlines.
let newline = UInt8(0x0A)
while let newlineIndex = receiveBuffer.firstIndex(of: newline) {
let lineData = receiveBuffer[receiveBuffer.startIndex..<newlineIndex]
receiveBuffer = receiveBuffer[(newlineIndex + 1)...]

// Skip empty lines.
guard !lineData.isEmpty else { continue }

do {
let message = try decoder.decode(ServerMessage.self, from: Data(lineData))
handleServerMessage(message)
} catch {
let lineString = String(data: Data(lineData), encoding: .utf8) ?? "<binary>"
let prefix = lineString.count > 200 ? String(lineString.prefix(200)) + "..." : lineString
log.error("Failed to decode server message: \(error.localizedDescription), line: \(prefix)")
}
}
}

private func handleServerMessage(_ message: ServerMessage) {
// Handle pong internally.
if case .pong = message {
awaitingPong = false
pongTimeoutTask?.cancel()
pongTimeoutTask = nil
}

// Yield all messages (including pong) to stream consumers.
continuation.yield(message)
}

// MARK: - Reconnect

private func handleUnexpectedDisconnect() {
disconnectInternal(triggerReconnect: shouldReconnect)
if shouldReconnect {
// Re-enable reconnect since disconnectInternal sets it based on the parameter.
self.shouldReconnect = true
scheduleReconnect()
}
}

private func scheduleReconnect() {
guard shouldReconnect else { return }
reconnectTask?.cancel()

let delay = reconnectDelay
log.info("Scheduling reconnect in \(delay)s")

reconnectTask = Task { @MainActor [weak self] in
do {
try await Task.sleep(nanoseconds: UInt64(delay * 1_000_000_000))
} catch {
return // Cancelled.
}

guard let self, self.shouldReconnect else { return }

// Increase backoff for next attempt.
self.reconnectDelay = min(self.reconnectDelay * 2, self.maxReconnectDelay)

do {
try await self.connect()
} catch {
log.error("Reconnect failed: \(error.localizedDescription)")
// connect() failure will trigger another scheduleReconnect via stateUpdateHandler
// only if we haven't already scheduled one.
if self.shouldReconnect && self.reconnectTask == nil {
self.scheduleReconnect()
}
}
}
}

// MARK: - Ping / Pong

private func startPingTimer() {
stopPingTimer()

pingTask = Task { @MainActor [weak self] in
while !Task.isCancelled {
do {
try await Task.sleep(nanoseconds: 30_000_000_000) // 30 seconds
} catch {
return // Cancelled.
}

guard let self, self.isConnected else { return }

self.sendPing()
}
}
}

private func stopPingTimer() {
pingTask?.cancel()
pingTask = nil
pongTimeoutTask?.cancel()
pongTimeoutTask = nil
awaitingPong = false
}

private func sendPing() {
do {
try send(PingMessage())
awaitingPong = true

// Start pong timeout.
pongTimeoutTask?.cancel()
pongTimeoutTask = Task { @MainActor [weak self] in
do {
try await Task.sleep(nanoseconds: 10_000_000_000) // 10 seconds
} catch {
return // Cancelled.
}

guard let self, self.awaitingPong else { return }
log.warning("Pong timeout, reconnecting")
self.handleUnexpectedDisconnect()
}
} catch {
log.error("Failed to send ping: \(error.localizedDescription)")
}
}
}