diff --git a/CLAUDE.md b/CLAUDE.md index 5a085373b0..f8363d85b3 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -134,7 +134,7 @@ web/ # Vue 3 + PrimeVue + Tailwind CSS dashboard src/ api/ # Axios client, endpoint modules, TypeScript types (mirrors backend Pydantic models) components/ # Vue components organized by feature (agents/, approvals/, budget/, common/, dashboard/, layout/, messages/, org-chart/, tasks/) - composables/ # Reusable composition functions (useAuth, useLoginLockout, usePolling, useOptimisticUpdate) + composables/ # Reusable composition functions (useAuth, useLoginLockout, usePolling, useOptimisticUpdate, useWebSocketSubscription) router/ # Vue Router config with auth guards stores/ # Pinia stores (auth, agents, tasks, budget, messages, meetings, approvals, websocket, analytics, company, providers) styles/ # Global CSS and PrimeVue theme configuration diff --git a/web/src/__tests__/composables/useWebSocketSubscription.test.ts b/web/src/__tests__/composables/useWebSocketSubscription.test.ts new file mode 100644 index 0000000000..1ea1977a08 --- /dev/null +++ b/web/src/__tests__/composables/useWebSocketSubscription.test.ts @@ -0,0 +1,307 @@ +import { describe, it, expect, vi, beforeEach, type Mock } from 'vitest' +import { setActivePinia, createPinia } from 'pinia' +import { onMounted, onUnmounted } from 'vue' + +// Mock Vue lifecycle hooks since we're not in a component context +vi.mock('vue', async () => { + const actual = await vi.importActual('vue') + return { + ...actual, + onMounted: vi.fn((cb: () => void) => cb()), + onUnmounted: vi.fn(), + } +}) + +import { useWebSocketSubscription } from '@/composables/useWebSocketSubscription' +import { useWebSocketStore } from '@/stores/websocket' +import { useAuthStore } from '@/stores/auth' +import type { WsEventHandler } from '@/api/types' + +describe('useWebSocketSubscription', () => { + let wsStore: ReturnType + let authStore: ReturnType + let consoleSpy: ReturnType + + beforeEach(() => { + setActivePinia(createPinia()) + wsStore = useWebSocketStore() + authStore = useAuthStore() + vi.clearAllMocks() + consoleSpy = vi.spyOn(console, 'error').mockImplementation(() => {}) + // Re-establish lifecycle mocks after clearAllMocks: + // - onMounted: synchronously invokes callback so setup logic runs during test + // - onUnmounted: no-op recorder; getUnmountCallback() reads from mock.calls + ;(onMounted as Mock).mockImplementation((cb: () => void) => cb()) + ;(onUnmounted as Mock).mockImplementation(() => {}) + }) + + function getUnmountCallback(): () => void { + const calls = (onUnmounted as Mock).mock.calls + expect(calls.length).toBeGreaterThan(0) + return calls[calls.length - 1][0] + } + + it('returns connected, reconnectExhausted, and setupError refs', () => { + const handler: WsEventHandler = vi.fn() + const result = useWebSocketSubscription({ + bindings: [{ channel: 'tasks', handler }], + }) + + expect(result.connected).toBeDefined() + expect(result.reconnectExhausted).toBeDefined() + expect(result.setupError).toBeDefined() + expect(result.connected.value).toBe(false) + expect(result.reconnectExhausted.value).toBe(false) + expect(result.setupError.value).toBeNull() + }) + + it('calls connect when auth token exists and not connected', () => { + const connectSpy = vi.spyOn(wsStore, 'connect') + authStore.$patch({ token: 'test-token' }) + + const handler: WsEventHandler = vi.fn() + useWebSocketSubscription({ + bindings: [{ channel: 'tasks', handler }], + }) + + expect(connectSpy).toHaveBeenCalledWith('test-token') + }) + + it('skips connect when already connected', () => { + const connectSpy = vi.spyOn(wsStore, 'connect') + authStore.$patch({ token: 'test-token' }) + wsStore.$patch({ connected: true }) + + const handler: WsEventHandler = vi.fn() + useWebSocketSubscription({ + bindings: [{ channel: 'tasks', handler }], + }) + + expect(connectSpy).not.toHaveBeenCalled() + }) + + it('skips all setup when no auth token', () => { + const connectSpy = vi.spyOn(wsStore, 'connect') + const subscribeSpy = vi.spyOn(wsStore, 'subscribe') + const onSpy = vi.spyOn(wsStore, 'onChannelEvent') + + const handler: WsEventHandler = vi.fn() + useWebSocketSubscription({ + bindings: [{ channel: 'tasks', handler }], + }) + + expect(connectSpy).not.toHaveBeenCalled() + expect(subscribeSpy).not.toHaveBeenCalled() + expect(onSpy).not.toHaveBeenCalled() + }) + + it('subscribes to deduplicated channels from bindings', () => { + const subscribeSpy = vi.spyOn(wsStore, 'subscribe') + authStore.$patch({ token: 'test-token' }) + const handler1: WsEventHandler = vi.fn() + const handler2: WsEventHandler = vi.fn() + + useWebSocketSubscription({ + bindings: [ + { channel: 'tasks', handler: handler1 }, + { channel: 'budget', handler: handler2 }, + ], + }) + + expect(subscribeSpy).toHaveBeenCalledWith(['tasks', 'budget'], undefined) + }) + + it('forwards filters to subscribe', () => { + const subscribeSpy = vi.spyOn(wsStore, 'subscribe') + authStore.$patch({ token: 'test-token' }) + const handler: WsEventHandler = vi.fn() + const filters = { project: 'test-project' } + + useWebSocketSubscription({ + bindings: [{ channel: 'tasks', handler }], + filters, + }) + + expect(subscribeSpy).toHaveBeenCalledWith(['tasks'], filters) + }) + + it('calls onChannelEvent for each binding', () => { + const onSpy = vi.spyOn(wsStore, 'onChannelEvent') + authStore.$patch({ token: 'test-token' }) + const handler1: WsEventHandler = vi.fn() + const handler2: WsEventHandler = vi.fn() + + useWebSocketSubscription({ + bindings: [ + { channel: 'tasks', handler: handler1 }, + { channel: 'budget', handler: handler2 }, + ], + }) + + expect(onSpy).toHaveBeenCalledTimes(2) + expect(onSpy).toHaveBeenCalledWith('tasks', handler1) + expect(onSpy).toHaveBeenCalledWith('budget', handler2) + }) + + it('deduplicates channels but wires both handlers for same channel', () => { + const subscribeSpy = vi.spyOn(wsStore, 'subscribe') + const onSpy = vi.spyOn(wsStore, 'onChannelEvent') + authStore.$patch({ token: 'test-token' }) + const handler1: WsEventHandler = vi.fn() + const handler2: WsEventHandler = vi.fn() + + useWebSocketSubscription({ + bindings: [ + { channel: 'tasks', handler: handler1 }, + { channel: 'tasks', handler: handler2 }, + ], + }) + + // Subscribe only lists channel once + expect(subscribeSpy).toHaveBeenCalledWith(['tasks'], undefined) + // Both handlers wired + expect(onSpy).toHaveBeenCalledTimes(2) + expect(onSpy).toHaveBeenCalledWith('tasks', handler1) + expect(onSpy).toHaveBeenCalledWith('tasks', handler2) + }) + + it('unsubscribes and removes handlers on unmount', () => { + const unsubscribeSpy = vi.spyOn(wsStore, 'unsubscribe') + const offSpy = vi.spyOn(wsStore, 'offChannelEvent') + authStore.$patch({ token: 'test-token' }) + const handler1: WsEventHandler = vi.fn() + const handler2: WsEventHandler = vi.fn() + + useWebSocketSubscription({ + bindings: [ + { channel: 'tasks', handler: handler1 }, + { channel: 'budget', handler: handler2 }, + ], + }) + + const unmount = getUnmountCallback() + unmount() + + expect(unsubscribeSpy).toHaveBeenCalledWith(['tasks', 'budget']) + expect(offSpy).toHaveBeenCalledTimes(2) + expect(offSpy).toHaveBeenCalledWith('tasks', handler1) + expect(offSpy).toHaveBeenCalledWith('budget', handler2) + }) + + it('sets setupError and logs when connect throws', () => { + vi.spyOn(wsStore, 'connect').mockImplementation(() => { + throw new Error('connection failed') + }) + authStore.$patch({ token: 'test-token' }) + + const handler: WsEventHandler = vi.fn() + const { setupError } = useWebSocketSubscription({ + bindings: [{ channel: 'tasks', handler }], + }) + + expect(setupError.value).toBe('WebSocket connection failed.') + expect(consoleSpy).toHaveBeenCalledWith( + 'WebSocket connect failed:', + 'connection failed', + expect.any(Error), + ) + }) + + it('skips subscribe and handler wiring when connect throws', () => { + const subscribeSpy = vi.spyOn(wsStore, 'subscribe') + const onSpy = vi.spyOn(wsStore, 'onChannelEvent') + vi.spyOn(wsStore, 'connect').mockImplementation(() => { + throw new Error('connection failed') + }) + authStore.$patch({ token: 'test-token' }) + + const handler: WsEventHandler = vi.fn() + useWebSocketSubscription({ + bindings: [{ channel: 'tasks', handler }], + }) + + expect(subscribeSpy).not.toHaveBeenCalled() + expect(onSpy).not.toHaveBeenCalled() + }) + + it('sets setupError and logs when subscribe throws', () => { + vi.spyOn(wsStore, 'subscribe').mockImplementation(() => { + throw new Error('subscribe failed') + }) + authStore.$patch({ token: 'test-token' }) + + const handler: WsEventHandler = vi.fn() + const { setupError } = useWebSocketSubscription({ + bindings: [{ channel: 'tasks', handler }], + }) + + expect(setupError.value).toBe('WebSocket subscription failed.') + expect(consoleSpy).toHaveBeenCalledWith( + 'WebSocket subscribe failed:', + 'subscribe failed', + expect.any(Error), + ) + }) + + it('still wires handlers when subscribe throws', () => { + const onSpy = vi.spyOn(wsStore, 'onChannelEvent') + vi.spyOn(wsStore, 'subscribe').mockImplementation(() => { + throw new Error('subscribe failed') + }) + authStore.$patch({ token: 'test-token' }) + + const handler: WsEventHandler = vi.fn() + useWebSocketSubscription({ + bindings: [{ channel: 'tasks', handler }], + }) + + expect(onSpy).toHaveBeenCalledWith('tasks', handler) + }) + + it('handles empty bindings array with token', () => { + const subscribeSpy = vi.spyOn(wsStore, 'subscribe') + const onSpy = vi.spyOn(wsStore, 'onChannelEvent') + authStore.$patch({ token: 'test-token' }) + + const result = useWebSocketSubscription({ bindings: [] }) + + expect(subscribeSpy).toHaveBeenCalledWith([], undefined) + expect(onSpy).not.toHaveBeenCalled() + expect(result.connected.value).toBe(false) + }) + + it('unmount cleanup runs safely after failed connect', () => { + const unsubscribeSpy = vi.spyOn(wsStore, 'unsubscribe') + const offSpy = vi.spyOn(wsStore, 'offChannelEvent') + vi.spyOn(wsStore, 'connect').mockImplementation(() => { + throw new Error('connection failed') + }) + authStore.$patch({ token: 'test-token' }) + + const handler: WsEventHandler = vi.fn() + useWebSocketSubscription({ + bindings: [{ channel: 'tasks', handler }], + }) + + const unmount = getUnmountCallback() + unmount() + + // Cleanup runs even though setup failed + expect(unsubscribeSpy).toHaveBeenCalledWith(['tasks']) + expect(offSpy).toHaveBeenCalledWith('tasks', handler) + }) + + it.each(['connected', 'reconnectExhausted'] as const)( + '%s ref reflects wsStore state', + (refName) => { + const handler: WsEventHandler = vi.fn() + const result = useWebSocketSubscription({ + bindings: [{ channel: 'tasks', handler }], + }) + + expect(result[refName].value).toBe(false) + wsStore.$patch({ [refName]: true }) + expect(result[refName].value).toBe(true) + }, + ) +}) diff --git a/web/src/api/types.ts b/web/src/api/types.ts index 5d7647958b..5b557a56c7 100644 --- a/web/src/api/types.ts +++ b/web/src/api/types.ts @@ -654,10 +654,13 @@ export interface WsEvent { payload: Record } +/** Filters for WebSocket channel subscriptions. */ +export type WsSubscriptionFilters = Readonly> + export interface WsSubscribeMessage { action: 'subscribe' channels: WsChannel[] - filters?: Record + filters?: WsSubscriptionFilters } export interface WsUnsubscribeMessage { diff --git a/web/src/composables/useWebSocketSubscription.ts b/web/src/composables/useWebSocketSubscription.ts new file mode 100644 index 0000000000..cad5fe0d0a --- /dev/null +++ b/web/src/composables/useWebSocketSubscription.ts @@ -0,0 +1,105 @@ +import { computed, onMounted, onUnmounted, ref, type ComputedRef } from 'vue' +import { useWebSocketStore } from '@/stores/websocket' +import { useAuthStore } from '@/stores/auth' +import { sanitizeForLog } from '@/utils/logging' +import type { WsChannel, WsEventHandler, WsSubscriptionFilters } from '@/api/types' + +/** A binding from a WebSocket channel to an event handler. */ +export interface ChannelBinding { + readonly channel: WsChannel + readonly handler: WsEventHandler +} + +/** Options for the useWebSocketSubscription composable. */ +export interface WebSocketSubscriptionOptions { + /** Channel-to-handler bindings. Each channel will be subscribed and its handler wired. */ + readonly bindings: readonly ChannelBinding[] + /** Optional filters passed to wsStore.subscribe(). */ + readonly filters?: WsSubscriptionFilters +} + +/** Return type exposing WebSocket connection and setup status. */ +export interface WebSocketSubscriptionReturn { + /** Whether the WebSocket is currently connected. */ + readonly connected: ComputedRef + /** Whether reconnection attempts have been exhausted. */ + readonly reconnectExhausted: ComputedRef + /** Non-null when WebSocket setup failed (connect or subscribe error). */ + readonly setupError: ComputedRef +} + +/** + * Manage WebSocket subscription lifecycle for a page view. + * + * Connects when an auth token is available and no connection is active, + * subscribes to deduplicated channels, and wires event handlers on mount. + * Automatically unsubscribes and removes handlers on unmount. + * + * Channels are deduplicated for subscription — multiple bindings on the same + * channel register multiple handlers but only one subscription. + * + * Setup errors (connect/subscribe failures) are caught and logged to console + * but do not throw. Monitor `setupError` for a user-facing error message, + * and `connected` / `reconnectExhausted` for connection health. + * + * When no auth token is present, all setup is skipped silently. + */ +export function useWebSocketSubscription( + options: WebSocketSubscriptionOptions, +): WebSocketSubscriptionReturn { + const wsStore = useWebSocketStore() + const authStore = useAuthStore() + const setupError = ref(null) + + const uniqueChannels: WsChannel[] = [...new Set(options.bindings.map((b) => b.channel))] + + onMounted(() => { + if (!authStore.token) return + + try { + if (!wsStore.connected) { + wsStore.connect(authStore.token) + } + } catch (err) { + setupError.value = 'WebSocket connection failed.' + console.error('WebSocket connect failed:', sanitizeForLog(err), err) + return + } + + try { + wsStore.subscribe(uniqueChannels, options.filters) + } catch (err) { + setupError.value = 'WebSocket subscription failed.' + console.error('WebSocket subscribe failed:', sanitizeForLog(err), err) + } + + for (const binding of options.bindings) { + try { + wsStore.onChannelEvent(binding.channel, binding.handler) + } catch (err) { + console.error('WebSocket handler wiring failed:', sanitizeForLog(err), err) + } + } + }) + + onUnmounted(() => { + try { + wsStore.unsubscribe(uniqueChannels) + } catch (err) { + console.error('WebSocket unsubscribe failed:', sanitizeForLog(err), err) + } + for (const binding of options.bindings) { + try { + wsStore.offChannelEvent(binding.channel, binding.handler) + } catch (err) { + console.error('WebSocket handler cleanup failed:', sanitizeForLog(err), err) + } + } + }) + + return { + connected: computed(() => wsStore.connected), + reconnectExhausted: computed(() => wsStore.reconnectExhausted), + setupError: computed(() => setupError.value), + } +} diff --git a/web/src/stores/websocket.ts b/web/src/stores/websocket.ts index d6b69f3d32..9b1f30ed90 100644 --- a/web/src/stores/websocket.ts +++ b/web/src/stores/websocket.ts @@ -1,6 +1,6 @@ import { defineStore } from 'pinia' import { ref } from 'vue' -import type { WsChannel, WsEvent, WsEventHandler } from '@/api/types' +import type { WsChannel, WsEvent, WsEventHandler, WsSubscriptionFilters } from '@/api/types' import { WS_RECONNECT_BASE_DELAY, WS_RECONNECT_MAX_DELAY, WS_MAX_RECONNECT_ATTEMPTS, WS_MAX_MESSAGE_SIZE } from '@/utils/constants' import { sanitizeForLog } from '@/utils/logging' @@ -152,7 +152,7 @@ export const useWebSocketStore = defineStore('websocket', () => { channelHandlers.clear() } - function subscribe(channels: WsChannel[], filters?: Record) { + function subscribe(channels: WsChannel[], filters?: WsSubscriptionFilters) { // Track as active subscription for auto-re-subscribe on reconnect const key = subscriptionKey(channels, filters) if (!activeSubscriptions.some((s) => subscriptionKey(s.channels, s.filters) === key)) { @@ -200,14 +200,14 @@ export const useWebSocketStore = defineStore('websocket', () => { } } - function onChannelEvent(channel: string, handler: WsEventHandler) { + function onChannelEvent(channel: WsChannel | '*', handler: WsEventHandler) { if (!channelHandlers.has(channel)) { channelHandlers.set(channel, new Set()) } channelHandlers.get(channel)!.add(handler) } - function offChannelEvent(channel: string, handler: WsEventHandler) { + function offChannelEvent(channel: WsChannel | '*', handler: WsEventHandler) { channelHandlers.get(channel)?.delete(handler) } diff --git a/web/src/views/AgentProfilesPage.vue b/web/src/views/AgentProfilesPage.vue index fcbbee627c..0e53e9571d 100644 --- a/web/src/views/AgentProfilesPage.vue +++ b/web/src/views/AgentProfilesPage.vue @@ -1,5 +1,5 @@