From 772b471b891648e799697c5d5465248b7a9b3556 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 29 May 2026 21:33:04 +0000 Subject: [PATCH 1/5] refactor(web): decentralize cross-domain SSE event handlers into bus subscribers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move 8 cross-domain event handlers from the monolithic use-stream-event-handler.ts dispatch into domain-scoped bus subscribers: - identity_changed, avatar_updated, relationship_state_updated, home_feed_updated → useAssistantResourceSync - disk_pressure_status_changed → useDiskPressureMonitor - notification_intent → new useNotificationIntentSync - document_editor_update → new useDocumentEditorSync - conversation_title_updated → useConversationSync Bug fix: relationship_state_updated now invalidates both HOME_FEED_QUERY_KEY_PREFIX and HOME_STATE_QUERY_KEY_PREFIX. Previously only the monolithic handler invalidated home-state; the bus subscriber missed it. Shrinks StreamHandlerContext by 3 fields, deletes home-handlers.ts entirely, removes 5 functions from metadata-handlers.ts. Closes LUM-2053 Co-Authored-By: ashlee@vellum.ai --- .../assistant/use-disk-pressure-monitor.ts | 10 +++ apps/web/src/domains/chat/chat-page.tsx | 3 - .../use-stream-event-handler-guard.test.tsx | 22 ++--- .../chat/hooks/use-stream-event-handler.ts | 89 ++++--------------- .../utils/stream-handlers/home-handlers.ts | 20 ----- .../stream-handlers/metadata-handlers.test.ts | 57 +----------- .../stream-handlers/metadata-handlers.ts | 86 ------------------ .../utils/stream-handlers/test-helpers.ts | 3 - .../chat/utils/stream-handlers/types.ts | 8 -- .../use-conversation-sync.test.tsx | 25 ++++++ .../conversations/use-conversation-sync.ts | 37 +++++--- .../home/hooks/use-home-state-query.ts | 7 +- .../use-assistant-resource-sync.test.tsx | 64 +++++++++++-- .../src/hooks/use-assistant-resource-sync.ts | 41 +++++++-- .../web/src/hooks/use-document-editor-sync.ts | 27 ++++++ .../src/hooks/use-notification-intent-sync.ts | 75 ++++++++++++++++ apps/web/src/lib/sync/query-tags.ts | 6 ++ apps/web/src/root-layout.tsx | 4 + 18 files changed, 287 insertions(+), 297 deletions(-) delete mode 100644 apps/web/src/domains/chat/utils/stream-handlers/home-handlers.ts create mode 100644 apps/web/src/hooks/use-document-editor-sync.ts create mode 100644 apps/web/src/hooks/use-notification-intent-sync.ts diff --git a/apps/web/src/assistant/use-disk-pressure-monitor.ts b/apps/web/src/assistant/use-disk-pressure-monitor.ts index 535adcae885..92fc5955ac8 100644 --- a/apps/web/src/assistant/use-disk-pressure-monitor.ts +++ b/apps/web/src/assistant/use-disk-pressure-monitor.ts @@ -12,6 +12,7 @@ import { getDiskPressureMonitorMode, type DiskPressureMonitorMode, } from "@/assistant/disk-pressure"; +import { useBusSubscription } from "@/hooks/use-bus-subscription"; import { useEventBusStore } from "@/stores/event-bus-store"; export interface UseDiskPressureMonitorOptions { @@ -204,6 +205,15 @@ export function useDiskPressureMonitor({ [assistantId, applyStatusForAssistant, clearStatus, enabled], ); + // React to daemon-pushed disk pressure events via the event bus. + // Complements the polling interval and resume-refresh above so + // status changes are reflected immediately without waiting for + // the next poll tick. + useBusSubscription("sse.event", (event) => { + if (event.type !== "disk_pressure_status_changed") return; + applyStatusEvent(event.status); + }); + const acknowledge = useCallback(async () => { const requestedAssistantId = assistantId; diff --git a/apps/web/src/domains/chat/chat-page.tsx b/apps/web/src/domains/chat/chat-page.tsx index 89c9dfdaf17..fec18e95d91 100644 --- a/apps/web/src/domains/chat/chat-page.tsx +++ b/apps/web/src/domains/chat/chat-page.tsx @@ -646,9 +646,6 @@ export function ChatPage() { setContextWindowUsage, scheduleConversationListRefetch, setCompactionCircuitOpenUntil, - applyDiskPressureStatusEvent: diskPressure.applyStatusEvent, - refreshAssistantIdentity, - invalidateAvatar, dispatchSyncChanged, pendingQueuedMessageIdsRef, requestIdToMessageIdRef, diff --git a/apps/web/src/domains/chat/hooks/use-stream-event-handler-guard.test.tsx b/apps/web/src/domains/chat/hooks/use-stream-event-handler-guard.test.tsx index 7225787888f..8a8448515b4 100644 --- a/apps/web/src/domains/chat/hooks/use-stream-event-handler-guard.test.tsx +++ b/apps/web/src/domains/chat/hooks/use-stream-event-handler-guard.test.tsx @@ -28,18 +28,6 @@ mock.module( }, }), ); -mock.module( - "@/domains/chat/utils/stream-handlers/home-handlers", - () => ({ - handleHomeFeedUpdated: () => { - handlerCalls.push({ kind: "home_feed_updated" }); - }, - handleRelationshipStateUpdated: () => { - handlerCalls.push({ kind: "relationship_state_updated" }); - }, - }), -); - const { useStreamEventHandler } = await import( "@/domains/chat/hooks/use-stream-event-handler" ); @@ -116,9 +104,6 @@ function renderHandler( setContextWindowUsage: () => {}, scheduleConversationListRefetch: () => {}, setCompactionCircuitOpenUntil: () => {}, - applyDiskPressureStatusEvent: () => {}, - refreshAssistantIdentity: async () => {}, - invalidateAvatar: () => {}, dispatchSyncChanged: () => {}, } as never), { wrapper }, @@ -218,7 +203,7 @@ describe("handleStreamEvent — defense-in-depth conversation routing guard", () expect(true).toBe(true); }); - test("forwards a global event (home_feed_updated) regardless of conversation context", () => { + test("no-ops a cross-domain event (home_feed_updated) handled by bus subscribers", () => { const refs = noopRefs(); const { handleStreamEvent } = renderHandler(refs, { streamConversationId: null, @@ -230,7 +215,10 @@ describe("handleStreamEvent — defense-in-depth conversation routing guard", () } as unknown as AssistantEvent, 0, ); - expect(handlerCalls).toContainEqual({ kind: "home_feed_updated" }); + // home_feed_updated is now handled by useAssistantResourceSync (bus + // subscriber), not the monolithic handler. The switch case is a no-op. + const homeFeedCalls = handlerCalls.filter((c) => c.kind === "home_feed_updated"); + expect(homeFeedCalls).toHaveLength(0); }); test("rejects events whose epoch is stale (regardless of conversation key)", () => { diff --git a/apps/web/src/domains/chat/hooks/use-stream-event-handler.ts b/apps/web/src/domains/chat/hooks/use-stream-event-handler.ts index c43a214e484..eb125068bfb 100644 --- a/apps/web/src/domains/chat/hooks/use-stream-event-handler.ts +++ b/apps/web/src/domains/chat/hooks/use-stream-event-handler.ts @@ -14,14 +14,9 @@ import type { DisplayMessage } from "@/domains/chat/utils/reconcile"; import { tailIsStreamingAssistant } from "@/domains/chat/hooks/stream-message-updaters"; import { useTurnStore } from "@/domains/chat/turn-store"; import { endTurn } from "@/domains/chat/turn-coordinator"; -import { useViewerStore } from "@/stores/viewer-store"; -import type { DiskPressureStatusEventPayload } from "@/assistant/use-disk-pressure-monitor"; + import { recordDiagnostic, summarizeAssistantEvent } from "@/lib/diagnostics"; import { isConversationScopedStreamEvent } from "@/domains/chat/utils/chat"; -import { - handleHomeFeedUpdated, - handleRelationshipStateUpdated, -} from "@/domains/chat/utils/stream-handlers/home-handlers"; import { handleOpenUrl, handleNavigateSettings, @@ -56,13 +51,8 @@ import { } from "@/domains/chat/utils/stream-handlers/tool-call-handlers"; import { handleUsageUpdate, - handleConversationTitleUpdated, - handleNotificationIntent, handleCompactionCircuitOpen, handleCompactionCircuitClosed, - handleDiskPressureStatusChanged, - handleIdentityChanged, - handleAvatarUpdated, handleTurnProfileAutoRouted, } from "@/domains/chat/utils/stream-handlers/metadata-handlers"; import { @@ -140,12 +130,7 @@ export interface UseStreamEventHandlerParams { // --- Compaction --- setCompactionCircuitOpenUntil: Dispatch>; - // --- External callbacks (stabilized via refs in the hook) --- - applyDiskPressureStatusEvent: ( - payload: DiskPressureStatusEventPayload, - ) => void; - refreshAssistantIdentity: (force?: boolean) => Promise; - invalidateAvatar: () => void; + // --- Sync router --- dispatchSyncChanged: (event: AssistantSyncChangedEvent) => void; // --- Queue management --- @@ -195,9 +180,6 @@ export function useStreamEventHandler( setContextWindowUsage, scheduleConversationListRefetch, setCompactionCircuitOpenUntil, - applyDiskPressureStatusEvent, - refreshAssistantIdentity, - invalidateAvatar, dispatchSyncChanged, pendingQueuedMessageIdsRef, requestIdToMessageIdRef, @@ -209,18 +191,6 @@ export function useStreamEventHandler( const toolCallIdCounterRef = useRef(0); const currentAssistantMessageIdRef = useRef(undefined); - // Stabilize external callbacks that may not be memoized upstream. - // Storing them in refs keeps handleStreamEvent's identity stable across - // renders while always calling the latest version of each callback. - // Reference: https://react.dev/reference/react/useCallback#preventing-an-effect-from-firing-too-often - const applyDiskPressureStatusEventRef = useRef(applyDiskPressureStatusEvent); - applyDiskPressureStatusEventRef.current = applyDiskPressureStatusEvent; - const refreshAssistantIdentityRef = useRef(refreshAssistantIdentity); - refreshAssistantIdentityRef.current = refreshAssistantIdentity; - const invalidateAvatarRef = useRef(invalidateAvatar); - invalidateAvatarRef.current = invalidateAvatar; - - // --- Main event handler --- const handleStreamEvent = useCallback( @@ -311,12 +281,6 @@ export function useStreamEventHandler( scheduleConversationListRefetch, queryClient, setCompactionCircuitOpenUntil, - applyDiskPressureStatusEvent: (...args) => - applyDiskPressureStatusEventRef.current(...args), - refreshAssistantIdentity: (...args) => - refreshAssistantIdentityRef.current(...args), - invalidateAvatar: (...args) => - invalidateAvatarRef.current(...args), pendingQueuedMessageIdsRef, requestIdToMessageIdRef, pendingLocalDeletionsRef, @@ -410,27 +374,14 @@ export function useStreamEventHandler( // to the Electron client and `conversation_list_invalidated` // is retired from the event types entirely. break; - case "conversation_title_updated": - handleConversationTitleUpdated(event, ctx); - break; - case "notification_intent": - handleNotificationIntent(event, ctx); - break; + case "compaction_circuit_open": handleCompactionCircuitOpen(event, ctx); break; case "compaction_circuit_closed": handleCompactionCircuitClosed(event, ctx); break; - case "disk_pressure_status_changed": - handleDiskPressureStatusChanged(event, ctx); - break; - case "identity_changed": - handleIdentityChanged(event, ctx); - break; - case "avatar_updated": - handleAvatarUpdated(event, ctx); - break; + case "turn_profile_auto_routed": handleTurnProfileAutoRouted(event, ctx); break; @@ -446,12 +397,7 @@ export function useStreamEventHandler( case "message_request_complete": handleMessageRequestComplete(event, ctx); break; - case "home_feed_updated": - handleHomeFeedUpdated(queryClient, event); - break; - case "relationship_state_updated": - handleRelationshipStateUpdated(queryClient, event); - break; + case "subagent_spawned": handleSubagentSpawned(event, ctx); break; @@ -464,21 +410,25 @@ export function useStreamEventHandler( case "sync_changed": dispatchSyncChanged(event); break; + + // Cross-domain events handled by bus subscribers mounted in + // RootLayout (useAssistantResourceSync, useConversationSync, + // useNotificationIntentSync, useDocumentEditorSync) or + // ChatPage-scoped hooks (useDiskPressureMonitor). The chat + // handler is intentionally a no-op for these. + case "home_feed_updated": + case "relationship_state_updated": + case "identity_changed": + case "avatar_updated": + case "disk_pressure_status_changed": + case "notification_intent": case "document_editor_update": - useViewerStore.getState().updateDocumentContent( - event.surfaceId, - event.markdown, - event.mode, - ); - break; + case "conversation_title_updated": case "document_comment_created": case "document_comment_resolved": case "document_comment_reopened": case "document_comment_deleted": case "interaction_resolved": - // Attention reconciliation lives in `useAttentionTracking`, which - // subscribes to the event bus directly. The chat-stream handler - // is intentionally a no-op here. break; case "unknown": break; @@ -497,9 +447,6 @@ export function useStreamEventHandler( // Stable deps listed for correctness — React guarantees identity // stability for state setters and refs, so these never trigger // re-creation of the callback. - // Note: applyDiskPressureStatusEvent, refreshAssistantIdentity, and - // invalidateAvatar are accessed via refs (stable identity) and are - // intentionally excluded from this dep array. dispatchSyncChanged, queryClient, streamEpochRef, diff --git a/apps/web/src/domains/chat/utils/stream-handlers/home-handlers.ts b/apps/web/src/domains/chat/utils/stream-handlers/home-handlers.ts deleted file mode 100644 index 9fecf1d0de5..00000000000 --- a/apps/web/src/domains/chat/utils/stream-handlers/home-handlers.ts +++ /dev/null @@ -1,20 +0,0 @@ -import type { QueryClient } from "@tanstack/react-query"; -import type { - HomeFeedUpdatedEvent, - RelationshipStateUpdatedEvent, -} from "@vellumai/assistant-api"; -import { HOME_FEED_QUERY_KEY_PREFIX } from "@/lib/sync/query-tags"; - -export function handleHomeFeedUpdated( - queryClient: QueryClient, - _event: HomeFeedUpdatedEvent, -): void { - queryClient.invalidateQueries({ queryKey: [HOME_FEED_QUERY_KEY_PREFIX] }); -} - -export function handleRelationshipStateUpdated( - queryClient: QueryClient, - _event: RelationshipStateUpdatedEvent, -): void { - queryClient.invalidateQueries({ queryKey: ["home-state"] }); -} diff --git a/apps/web/src/domains/chat/utils/stream-handlers/metadata-handlers.test.ts b/apps/web/src/domains/chat/utils/stream-handlers/metadata-handlers.test.ts index d11bf9b8fa1..dfcb95e2d12 100644 --- a/apps/web/src/domains/chat/utils/stream-handlers/metadata-handlers.test.ts +++ b/apps/web/src/domains/chat/utils/stream-handlers/metadata-handlers.test.ts @@ -1,16 +1,11 @@ import { describe, expect, it } from "bun:test"; import { makeCtx } from "@/domains/chat/utils/stream-handlers/test-helpers"; -import { conversationsQueryKey } from "@/domains/conversations/conversation-queries"; -import type { Conversation } from "@/types/conversation-types"; + import { handleUsageUpdate, - handleConversationTitleUpdated, handleCompactionCircuitOpen, handleCompactionCircuitClosed, - handleDiskPressureStatusChanged, - handleIdentityChanged, - handleAvatarUpdated, } from "@/domains/chat/utils/stream-handlers/metadata-handlers"; describe("handleUsageUpdate", () => { @@ -71,31 +66,6 @@ describe("handleUsageUpdate", () => { }); }); -describe("handleConversationTitleUpdated", () => { - it("patches conversation title in the conversations query cache", () => { - const ctx = makeCtx(); - ctx.queryClient.setQueryData( - conversationsQueryKey(ctx.assistantIdRef.current), - [{ conversationId: "conv-1", title: "Old Title" } as Conversation], - ); - - handleConversationTitleUpdated( - { - type: "conversation_title_updated", - conversationId: "conv-1", - title: "New Title", - }, - ctx, - ); - - const cached = ctx.queryClient.getQueryData( - conversationsQueryKey(ctx.assistantIdRef.current), - ); - const conv = cached?.find((c) => c.conversationId === "conv-1"); - expect(conv?.title).toBe("New Title"); - }); -}); - describe("handleCompactionCircuitOpen", () => { it("sets compaction circuit open until date", () => { const ctx = makeCtx(); @@ -123,29 +93,4 @@ describe("handleCompactionCircuitClosed", () => { }); }); -describe("handleDiskPressureStatusChanged", () => { - it("delegates to applyDiskPressureStatusEvent", () => { - const ctx = makeCtx(); - handleDiskPressureStatusChanged( - { type: "disk_pressure_status_changed", status: null }, - ctx, - ); - expect(ctx.applyDiskPressureStatusEvent).toHaveBeenCalledWith(null); - }); -}); -describe("handleIdentityChanged", () => { - it("calls refreshAssistantIdentity with force=true", () => { - const ctx = makeCtx(); - handleIdentityChanged({ type: "identity_changed" }, ctx); - expect(ctx.refreshAssistantIdentity).toHaveBeenCalledWith(true); - }); -}); - -describe("handleAvatarUpdated", () => { - it("calls invalidateAvatar", () => { - const ctx = makeCtx(); - handleAvatarUpdated({ type: "avatar_updated" }, ctx); - expect(ctx.invalidateAvatar).toHaveBeenCalled(); - }); -}); diff --git a/apps/web/src/domains/chat/utils/stream-handlers/metadata-handlers.ts b/apps/web/src/domains/chat/utils/stream-handlers/metadata-handlers.ts index b2127bdf0b3..2a6826fc0ce 100644 --- a/apps/web/src/domains/chat/utils/stream-handlers/metadata-handlers.ts +++ b/apps/web/src/domains/chat/utils/stream-handlers/metadata-handlers.ts @@ -1,26 +1,14 @@ import type { ContextWindowUsage } from "@/domains/chat/components/context-window-indicator"; import { saveContextWindowUsage } from "@/domains/chat/utils/context-window-storage"; -import { - extractConversationId, - postLocalNotification, - sendNotificationIntentAck, -} from "@/runtime/notifications"; -import { patchConversation } from "@/domains/conversations/conversation-queries"; import type { StreamHandlerContext } from "@/domains/chat/utils/stream-handlers/types"; import type { CompactionCircuitClosedEvent, CompactionCircuitOpenEvent, } from "@vellumai/assistant-api"; import type { - AvatarUpdatedEvent, - ConversationTitleUpdatedEvent, - DiskPressureStatusChangedEvent, - IdentityChangedEvent, - NotificationIntentEvent, TurnProfileAutoRoutedEvent, UsageUpdateEvent, } from "@/types/event-types"; -import { useConversationStore } from "@/stores/conversation-store"; export function handleUsageUpdate( event: UsageUpdateEvent, @@ -56,59 +44,6 @@ export function handleUsageUpdate( ctx.setContextWindowUsage(usage); } -export function handleConversationTitleUpdated( - event: ConversationTitleUpdatedEvent, - ctx: StreamHandlerContext, -): void { - // `patchConversation` looks up the Conversation entity by its - // `conversationId` field in the React Query cache. The event's - // `conversationId` (hydrated in `event-parser.ts` via - // `withParsedConversationId` or the envelope fallback in `stream.ts`) - // is the value to match against. - patchConversation( - ctx.queryClient, - ctx.assistantIdRef.current, - event.conversationId, - { title: event.title }, - ); -} - -export function handleNotificationIntent( - event: NotificationIntentEvent, - ctx: StreamHandlerContext, -): void { - const streamCtx = ctx.streamContextRef.current; - const ackAssistantId = streamCtx?.assistantId; - - if (event.targetGuardianPrincipalId) { - if (ackAssistantId && event.deliveryId) { - void sendNotificationIntentAck(ackAssistantId, event.deliveryId, true); - } - return; - } - - const metadataConversationId = extractConversationId(event.deepLinkMetadata); - if ( - metadataConversationId && - metadataConversationId === - useConversationStore.getState().activeConversationId - ) { - if (ackAssistantId && event.deliveryId) { - void sendNotificationIntentAck(ackAssistantId, event.deliveryId, true); - } - return; - } - - void postLocalNotification({ - title: event.title, - body: event.body, - sourceEventName: event.sourceEventName, - deliveryId: event.deliveryId, - deepLinkMetadata: event.deepLinkMetadata, - assistantId: ackAssistantId, - }); -} - export function handleCompactionCircuitOpen( event: CompactionCircuitOpenEvent, ctx: StreamHandlerContext, @@ -123,27 +58,6 @@ export function handleCompactionCircuitClosed( ctx.setCompactionCircuitOpenUntil(null); } -export function handleDiskPressureStatusChanged( - event: DiskPressureStatusChangedEvent, - ctx: StreamHandlerContext, -): void { - ctx.applyDiskPressureStatusEvent(event.status); -} - -export function handleIdentityChanged( - _event: IdentityChangedEvent, - ctx: StreamHandlerContext, -): void { - void ctx.refreshAssistantIdentity(true); -} - -export function handleAvatarUpdated( - _event: AvatarUpdatedEvent, - ctx: StreamHandlerContext, -): void { - ctx.invalidateAvatar(); -} - export function handleTurnProfileAutoRouted( event: TurnProfileAutoRoutedEvent, ctx: StreamHandlerContext, diff --git a/apps/web/src/domains/chat/utils/stream-handlers/test-helpers.ts b/apps/web/src/domains/chat/utils/stream-handlers/test-helpers.ts index 4fbdd508737..7cf6081cf3d 100644 --- a/apps/web/src/domains/chat/utils/stream-handlers/test-helpers.ts +++ b/apps/web/src/domains/chat/utils/stream-handlers/test-helpers.ts @@ -62,9 +62,6 @@ export function makeCtx( scheduleConversationListRefetch: mock(() => {}), queryClient: new QueryClient(), setCompactionCircuitOpenUntil: mock(() => {}), - applyDiskPressureStatusEvent: mock(() => {}), - refreshAssistantIdentity: mock(() => Promise.resolve()), - invalidateAvatar: mock(() => {}), pendingQueuedMessageIdsRef: { current: [] }, requestIdToMessageIdRef: { current: new Map() }, pendingLocalDeletionsRef: { current: new Set() }, diff --git a/apps/web/src/domains/chat/utils/stream-handlers/types.ts b/apps/web/src/domains/chat/utils/stream-handlers/types.ts index a2ff2e11f8e..fd204435252 100644 --- a/apps/web/src/domains/chat/utils/stream-handlers/types.ts +++ b/apps/web/src/domains/chat/utils/stream-handlers/types.ts @@ -8,7 +8,6 @@ import type { ContextWindowUsage } from "@/domains/chat/components/context-windo import type { DisplayMessage } from "@/domains/chat/utils/reconcile"; import type { TurnActions, TurnState } from "@/domains/chat/turn-store"; import type { EndTurnArgs } from "@/domains/chat/turn-coordinator"; -import type { DiskPressureStatusEventPayload } from "@/assistant/use-disk-pressure-monitor"; import type { ChatError, PendingQuestionState } from "@/domains/chat/types"; import type { ChatEventStream } from "@/lib/streaming/stream-transport"; @@ -85,13 +84,6 @@ export interface StreamHandlerContext { // --- Compaction --- setCompactionCircuitOpenUntil: Dispatch>; - // --- External callbacks --- - applyDiskPressureStatusEvent: ( - payload: DiskPressureStatusEventPayload, - ) => void; - refreshAssistantIdentity: (force?: boolean) => Promise; - invalidateAvatar: () => void; - // --- Queue management --- pendingQueuedMessageIdsRef: MutableRefObject; requestIdToMessageIdRef: MutableRefObject>; diff --git a/apps/web/src/domains/conversations/use-conversation-sync.test.tsx b/apps/web/src/domains/conversations/use-conversation-sync.test.tsx index d1fd847d5ee..0771cc1a2f0 100644 --- a/apps/web/src/domains/conversations/use-conversation-sync.test.tsx +++ b/apps/web/src/domains/conversations/use-conversation-sync.test.tsx @@ -6,6 +6,8 @@ import { cleanup, renderHook, waitFor } from "@testing-library/react"; import { conversationGroupsQueryKey, } from "@/domains/conversations/conversation-queries"; +import type { AssistantEvent } from "@/types/event-types"; +import type { Conversation } from "@/types/conversation-types"; import { conversationsQueryKey } from "@/lib/sync/query-tags"; import { SYNC_TAGS, type SyncChangedEvent } from "@/lib/sync/types"; import { @@ -291,4 +293,27 @@ describe("useConversationSync", () => { await Promise.resolve(); expect(spy).not.toHaveBeenCalled(); }); + + test("patches conversation title in cache on conversation_title_updated", async () => { + const queryClient = freshQueryClient(); + queryClient.setQueryData( + conversationsQueryKey("asst-1"), + [{ conversationId: "conv-1", title: "Old Title" } as Conversation], + ); + renderHook(() => useConversationSync("asst-1", true), { + wrapper: createWrapper(queryClient), + }); + useEventBusStore.getState().publish("sse.event", { + type: "conversation_title_updated", + conversationId: "conv-1", + title: "New Title", + } as unknown as AssistantEvent); + await waitFor(() => { + const cached = queryClient.getQueryData( + conversationsQueryKey("asst-1"), + ); + const conv = cached?.find((c) => c.conversationId === "conv-1"); + expect(conv?.title).toBe("New Title"); + }); + }); }); diff --git a/apps/web/src/domains/conversations/use-conversation-sync.ts b/apps/web/src/domains/conversations/use-conversation-sync.ts index 97b4ba91af2..f709e4c588b 100644 --- a/apps/web/src/domains/conversations/use-conversation-sync.ts +++ b/apps/web/src/domains/conversations/use-conversation-sync.ts @@ -1,10 +1,11 @@ /** * Domain-scoped bus consumer for conversation cache invalidation. * - * Routes `conversationsList` umbrella tags and per-conversation - * `conversation::metadata` tags into TanStack Query cache - * operations. Debounces list-level invalidations so rapid-fire - * sync_changed bursts collapse into a single refetch. + * Routes `conversationsList` umbrella tags, per-conversation + * `conversation::metadata` tags, and the `conversation_title_updated` + * event into TanStack Query cache operations. Debounces list-level + * invalidations so rapid-fire sync_changed bursts collapse into a + * single refetch. * * Also handles SSE reconnect (`sse.opened`) by scheduling a debounced * conversation list refetch to catch events missed during the gap. @@ -24,6 +25,7 @@ import { useQueryClient } from "@tanstack/react-query"; import { conversationGroupsQueryKey, + patchConversation, refreshConversationRow, } from "@/domains/conversations/conversation-queries"; import { useBusSubscription } from "@/hooks/use-bus-subscription"; @@ -65,13 +67,26 @@ export function useConversationSync( useBusSubscription("sse.event", (event) => { if (!assistantId || !isAssistantActive) return; - if (event.type !== "sync_changed") return; - handleConversationSyncTags( - event, - assistantId, - queryClient, - debounceTimerRef, - ); + + switch (event.type) { + case "sync_changed": + handleConversationSyncTags( + event, + assistantId, + queryClient, + debounceTimerRef, + ); + return; + + case "conversation_title_updated": + patchConversation( + queryClient, + assistantId, + event.conversationId, + { title: event.title }, + ); + return; + } }); useBusSubscription("sse.opened", ({ cause }) => { diff --git a/apps/web/src/domains/home/hooks/use-home-state-query.ts b/apps/web/src/domains/home/hooks/use-home-state-query.ts index 3293eb10731..766944a2fae 100644 --- a/apps/web/src/domains/home/hooks/use-home-state-query.ts +++ b/apps/web/src/domains/home/hooks/use-home-state-query.ts @@ -1,15 +1,10 @@ import { useCallback } from "react"; import { useQuery, useQueryClient } from "@tanstack/react-query"; +import { homeStateQueryKey } from "@/lib/sync/query-tags"; import { fetchRelationshipState } from "../api"; import type { RelationshipState } from "../types"; -const QUERY_KEY_PREFIX = "home-state" as const; - -function homeStateQueryKey(assistantId: string) { - return [QUERY_KEY_PREFIX, assistantId] as const; -} - /** * React Query hook for the assistant relationship state (tier, facts, * capabilities, conversation count, etc.). diff --git a/apps/web/src/hooks/use-assistant-resource-sync.test.tsx b/apps/web/src/hooks/use-assistant-resource-sync.test.tsx index 7eec9bdbee5..4458f7a09d3 100644 --- a/apps/web/src/hooks/use-assistant-resource-sync.test.tsx +++ b/apps/web/src/hooks/use-assistant-resource-sync.test.tsx @@ -12,6 +12,7 @@ import { assistantSoundsConfigQueryKey, avatarQueryKey, HOME_FEED_QUERY_KEY_PREFIX, + HOME_STATE_QUERY_KEY_PREFIX, } from "@/lib/sync/query-tags"; import { SYNC_TAGS, type SyncChangedEvent } from "@/lib/sync/types"; import { @@ -136,7 +137,7 @@ describe("useAssistantResourceSync", () => { }); }); - test("invalidates home-feed queries on home_feed_updated and relationship_state_updated", async () => { + test("invalidates home-feed query on home_feed_updated", async () => { const queryClient = freshQueryClient(); const spy = mock(() => Promise.resolve()); queryClient.invalidateQueries = spy as never; @@ -148,18 +149,67 @@ describe("useAssistantResourceSync", () => { updatedAt: "2026-05-21T00:00:00Z", newItemCount: 1, } as unknown as AssistantEvent); + await waitFor(() => { + expect(spy).toHaveBeenCalledWith({ + queryKey: [HOME_FEED_QUERY_KEY_PREFIX], + }); + }); + }); + + test("invalidates both home-feed and home-state on relationship_state_updated", async () => { + const queryClient = freshQueryClient(); + const calls: unknown[][] = []; + queryClient.invalidateQueries = ((arg: unknown) => { + calls.push([arg]); + return Promise.resolve(); + }) as never; + renderHook(() => useAssistantResourceSync("asst-1", true), { + wrapper: createWrapper(queryClient), + }); emit({ type: "relationship_state_updated", updatedAt: "2026-05-21T00:00:00Z", } as unknown as AssistantEvent); await waitFor(() => { - const homeCalls = (spy.mock.calls as unknown as Array<[unknown]>).filter( - (call) => { - const arg = call[0] as { queryKey: readonly unknown[] } | undefined; - return arg?.queryKey?.[0] === HOME_FEED_QUERY_KEY_PREFIX; - }, + const queryKeys = calls.map( + ([arg]) => (arg as { queryKey: readonly unknown[] }).queryKey, + ); + expect(queryKeys).toEqual( + expect.arrayContaining([ + [HOME_FEED_QUERY_KEY_PREFIX], + [HOME_STATE_QUERY_KEY_PREFIX], + ]) as never, ); - expect(homeCalls.length).toBe(2); + }); + }); + + test("invalidates identity query on identity_changed event", async () => { + const queryClient = freshQueryClient(); + const spy = mock(() => Promise.resolve()); + queryClient.invalidateQueries = spy as never; + renderHook(() => useAssistantResourceSync("asst-1", true), { + wrapper: createWrapper(queryClient), + }); + emit({ type: "identity_changed" } as unknown as AssistantEvent); + await waitFor(() => { + expect(spy).toHaveBeenCalledWith({ + queryKey: assistantIdentityQueryKey("asst-1"), + }); + }); + }); + + test("invalidates avatar query on avatar_updated event", async () => { + const queryClient = freshQueryClient(); + const spy = mock(() => Promise.resolve()); + queryClient.invalidateQueries = spy as never; + renderHook(() => useAssistantResourceSync("asst-1", true), { + wrapper: createWrapper(queryClient), + }); + emit({ type: "avatar_updated" } as unknown as AssistantEvent); + await waitFor(() => { + expect(spy).toHaveBeenCalledWith({ + queryKey: avatarQueryKey("asst-1"), + }); }); }); diff --git a/apps/web/src/hooks/use-assistant-resource-sync.ts b/apps/web/src/hooks/use-assistant-resource-sync.ts index c6aa68f5a2f..b4f69db67f4 100644 --- a/apps/web/src/hooks/use-assistant-resource-sync.ts +++ b/apps/web/src/hooks/use-assistant-resource-sync.ts @@ -1,11 +1,13 @@ /** * Bus consumer for assistant-level resource cache invalidation. * - * Routes avatar, identity, config, sounds, and schedules - * `sync_changed` tags — plus `home_feed_updated` and - * `relationship_state_updated` events — into TanStack Query cache - * invalidations. All operations are simple one-liner invalidations - * with no debouncing or per-row patching. + * Routes `sync_changed` tags (avatar, identity, config, sounds, + * schedules) and discrete SSE events (`home_feed_updated`, + * `relationship_state_updated`, `identity_changed`, `avatar_updated`) + * into TanStack Query cache invalidations. + * + * All operations are stateless one-liner invalidations with no + * debouncing or per-row patching. * * More complex sync domains (conversations, feature flags) own their * own hooks: @@ -29,6 +31,7 @@ import { assistantSoundsConfigQueryKey, avatarQueryKey, HOME_FEED_QUERY_KEY_PREFIX, + HOME_STATE_QUERY_KEY_PREFIX, } from "@/lib/sync/query-tags"; import { SYNC_TAGS } from "@/lib/sync/types"; @@ -36,10 +39,10 @@ import { SYNC_TAGS } from "@/lib/sync/types"; * Subscribes to assistant-resource sync events via the event bus. * * Handles `sync_changed` tags for avatar, identity, config, sounds, - * and schedules, plus `home_feed_updated` / `relationship_state_updated` - * event types. These are all stateless invalidations — no reconnect - * handling needed since the underlying `useQuery` hooks refetch - * automatically when the query becomes stale. + * and schedules, plus discrete event types for home feed/state changes + * and identity/avatar pushes. These are all stateless invalidations — + * no reconnect handling needed since the underlying `useQuery` hooks + * refetch automatically when the query becomes stale. */ export function useAssistantResourceSync( assistantId: string | null, @@ -90,10 +93,30 @@ export function useAssistantResourceSync( return; case "home_feed_updated": + void queryClient.invalidateQueries({ + queryKey: [HOME_FEED_QUERY_KEY_PREFIX], + }); + return; + case "relationship_state_updated": void queryClient.invalidateQueries({ queryKey: [HOME_FEED_QUERY_KEY_PREFIX], }); + void queryClient.invalidateQueries({ + queryKey: [HOME_STATE_QUERY_KEY_PREFIX], + }); + return; + + case "identity_changed": + void queryClient.invalidateQueries({ + queryKey: assistantIdentityQueryKey(assistantId), + }); + return; + + case "avatar_updated": + void queryClient.invalidateQueries({ + queryKey: avatarQueryKey(assistantId), + }); return; } }); diff --git a/apps/web/src/hooks/use-document-editor-sync.ts b/apps/web/src/hooks/use-document-editor-sync.ts new file mode 100644 index 00000000000..3601df1c209 --- /dev/null +++ b/apps/web/src/hooks/use-document-editor-sync.ts @@ -0,0 +1,27 @@ +/** + * Bus consumer for `document_editor_update` SSE events. + * + * Applies streamed document content updates to the viewer store. + * The daemon sends incremental markdown content (append or replace + * mode) as the assistant edits a document surface. + * + * References: + * - EVENT_BUS.md — bus subscription contract + * - stores/viewer-store.ts — document editor state + */ + +import { useBusSubscription } from "@/hooks/use-bus-subscription"; +import { useViewerStore } from "@/stores/viewer-store"; + +/** + * Subscribes to `document_editor_update` SSE events via the event bus + * and forwards content updates to the viewer store. + */ +export function useDocumentEditorSync(): void { + useBusSubscription("sse.event", (event) => { + if (event.type !== "document_editor_update") return; + useViewerStore + .getState() + .updateDocumentContent(event.surfaceId, event.markdown, event.mode); + }); +} diff --git a/apps/web/src/hooks/use-notification-intent-sync.ts b/apps/web/src/hooks/use-notification-intent-sync.ts new file mode 100644 index 00000000000..294a2254d9b --- /dev/null +++ b/apps/web/src/hooks/use-notification-intent-sync.ts @@ -0,0 +1,75 @@ +/** + * Bus consumer for `notification_intent` SSE events. + * + * Turns daemon-pushed notification intents into local browser or + * Capacitor notifications. Skips guardian-scoped notifications + * (the web client does not participate in guardian binding) and + * notifications targeting the currently focused conversation + * (the user is already looking at it). + * + * Acks every notification back to the daemon so delivery audit + * trails stay consistent with the macOS client. + * + * References: + * - EVENT_BUS.md — bus subscription contract + * - runtime/notifications.ts — notification scheduling and ack API + */ + +import { useBusSubscription } from "@/hooks/use-bus-subscription"; +import { + extractConversationId, + postLocalNotification, + sendNotificationIntentAck, +} from "@/runtime/notifications"; +import { useConversationStore } from "@/stores/conversation-store"; + +/** + * Subscribes to `notification_intent` SSE events via the event bus + * and schedules local notifications. + * + * @param assistantId — current assistant; `null` disables the subscription + */ +export function useNotificationIntentSync( + assistantId: string | null, +): void { + useBusSubscription("sse.event", (event) => { + if (event.type !== "notification_intent") return; + + const ackAssistantId = assistantId; + + // Guardian-scoped notifications are for devices bound to that + // guardian identity. The web/Capacitor client does not participate + // in guardian binding — skip to avoid leaking to unintended devices. + if (event.targetGuardianPrincipalId) { + if (ackAssistantId && event.deliveryId) { + void sendNotificationIntentAck(ackAssistantId, event.deliveryId, true); + } + return; + } + + // If the notification targets the conversation the user is already + // viewing, suppress the banner and ack silently. + const metadataConversationId = extractConversationId( + event.deepLinkMetadata, + ); + if ( + metadataConversationId && + metadataConversationId === + useConversationStore.getState().activeConversationId + ) { + if (ackAssistantId && event.deliveryId) { + void sendNotificationIntentAck(ackAssistantId, event.deliveryId, true); + } + return; + } + + void postLocalNotification({ + title: event.title, + body: event.body, + sourceEventName: event.sourceEventName, + deliveryId: event.deliveryId, + deepLinkMetadata: event.deepLinkMetadata, + assistantId: ackAssistantId ?? undefined, + }); + }); +} diff --git a/apps/web/src/lib/sync/query-tags.ts b/apps/web/src/lib/sync/query-tags.ts index 56bce24be47..2cc4db70c62 100644 --- a/apps/web/src/lib/sync/query-tags.ts +++ b/apps/web/src/lib/sync/query-tags.ts @@ -66,6 +66,12 @@ export function homeFeedQueryKey(assistantId: string) { return [HOME_FEED_QUERY_KEY_PREFIX, assistantId] as const; } +export const HOME_STATE_QUERY_KEY_PREFIX = "home-state" as const; + +export function homeStateQueryKey(assistantId: string) { + return [HOME_STATE_QUERY_KEY_PREFIX, assistantId] as const; +} + export function invalidateAssistantConfigQueries( queryClient: QueryClient, assistantId: string | null | undefined, diff --git a/apps/web/src/root-layout.tsx b/apps/web/src/root-layout.tsx index bec1a0db1e4..a6c006810b1 100644 --- a/apps/web/src/root-layout.tsx +++ b/apps/web/src/root-layout.tsx @@ -11,6 +11,8 @@ import { import { useAuthStore } from "@/stores/auth-store"; import { useEnvironmentStore } from "@/stores/environment-store"; import { useAssistantResourceSync } from "@/hooks/use-assistant-resource-sync"; +import { useDocumentEditorSync } from "@/hooks/use-document-editor-sync"; +import { useNotificationIntentSync } from "@/hooks/use-notification-intent-sync"; import { useConversationSync } from "@/domains/conversations/use-conversation-sync"; import { useFeatureFlagBusSync } from "@/hooks/use-feature-flag-bus-sync"; import { useClientFeatureFlagSync } from "@/hooks/use-client-feature-flag-sync"; @@ -87,6 +89,8 @@ export function RootLayout() { useAssistantResourceSync(lifecycle.assistantId, isAssistantActive); useConversationSync(lifecycle.assistantId, isAssistantActive); useFeatureFlagBusSync(lifecycle.assistantId, isAssistantActive); + useNotificationIntentSync(lifecycle.assistantId); + useDocumentEditorSync(); useEventBusInit({ assistantId: lifecycle.assistantId, From 881fa06334f6f6bb774df21f68de38115bceb788 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 29 May 2026 21:35:48 +0000 Subject: [PATCH 2/5] chore(web): update cross-domain allowlist after handler decentralization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove 2 stale entries for metadata-handlers.ts and its test — they no longer import from the conversations domain since those handlers moved to bus subscribers. Co-Authored-By: ashlee@vellum.ai --- apps/web/.cross-domain-allowlist.json | 6 ------ 1 file changed, 6 deletions(-) diff --git a/apps/web/.cross-domain-allowlist.json b/apps/web/.cross-domain-allowlist.json index 0a7b8435079..10c337aa9b4 100644 --- a/apps/web/.cross-domain-allowlist.json +++ b/apps/web/.cross-domain-allowlist.json @@ -25,12 +25,6 @@ "src/domains/chat/hooks/use-voice-input.ts": [ "voice" ], - "src/domains/chat/utils/stream-handlers/metadata-handlers.test.ts": [ - "conversations" - ], - "src/domains/chat/utils/stream-handlers/metadata-handlers.ts": [ - "conversations" - ], "src/domains/conversations/use-attention-tracking.ts": [ "chat" ], From f2a443b206d8a7b3fd9f1ae2d924a9b2b9df75f8 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 29 May 2026 21:39:54 +0000 Subject: [PATCH 3/5] fix(web): gate notification suppression on conversation route visibility activeConversationId persists across route changes (never cleared on navigation). In the old monolithic handler, notification suppression was implicitly scoped to ChatPage. Now that the bus subscriber runs in RootLayout, we also verify the URL pathname matches the conversation route to prevent stale-id suppression on home/settings/etc. Co-Authored-By: ashlee@vellum.ai --- apps/web/src/hooks/use-notification-intent-sync.ts | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/apps/web/src/hooks/use-notification-intent-sync.ts b/apps/web/src/hooks/use-notification-intent-sync.ts index 294a2254d9b..bcd9f8ab2de 100644 --- a/apps/web/src/hooks/use-notification-intent-sync.ts +++ b/apps/web/src/hooks/use-notification-intent-sync.ts @@ -4,8 +4,9 @@ * Turns daemon-pushed notification intents into local browser or * Capacitor notifications. Skips guardian-scoped notifications * (the web client does not participate in guardian binding) and - * notifications targeting the currently focused conversation - * (the user is already looking at it). + * notifications targeting the conversation the user is actively + * viewing (verified by both store state and URL pathname, since + * `activeConversationId` persists across route changes). * * Acks every notification back to the daemon so delivery audit * trails stay consistent with the macOS client. @@ -47,15 +48,18 @@ export function useNotificationIntentSync( return; } - // If the notification targets the conversation the user is already - // viewing, suppress the banner and ack silently. + // Suppress the banner when the user is already viewing the target + // conversation. `activeConversationId` is never cleared on navigation, + // so we also verify the URL matches the conversation route — otherwise + // a stale id would suppress notifications on home/settings/etc. const metadataConversationId = extractConversationId( event.deepLinkMetadata, ); if ( metadataConversationId && metadataConversationId === - useConversationStore.getState().activeConversationId + useConversationStore.getState().activeConversationId && + window.location.pathname.startsWith("/assistant/conversations/") ) { if (ackAssistantId && event.deliveryId) { void sendNotificationIntentAck(ackAssistantId, event.deliveryId, true); From 8ae2dd0b3b2d4e654ec37c68d5c6d5896cbf384a Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 29 May 2026 21:42:48 +0000 Subject: [PATCH 4/5] refactor(web): remove redundant ackAssistantId alias in notification sync useBusSubscription stabilizes the handler ref so the closure always captures the latest assistantId. The alias was a vestige of the old monolithic handler's ref-based capture pattern. Co-Authored-By: ashlee@vellum.ai --- apps/web/src/hooks/use-notification-intent-sync.ts | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/apps/web/src/hooks/use-notification-intent-sync.ts b/apps/web/src/hooks/use-notification-intent-sync.ts index bcd9f8ab2de..b1e404f1908 100644 --- a/apps/web/src/hooks/use-notification-intent-sync.ts +++ b/apps/web/src/hooks/use-notification-intent-sync.ts @@ -36,14 +36,12 @@ export function useNotificationIntentSync( useBusSubscription("sse.event", (event) => { if (event.type !== "notification_intent") return; - const ackAssistantId = assistantId; - // Guardian-scoped notifications are for devices bound to that // guardian identity. The web/Capacitor client does not participate // in guardian binding — skip to avoid leaking to unintended devices. if (event.targetGuardianPrincipalId) { - if (ackAssistantId && event.deliveryId) { - void sendNotificationIntentAck(ackAssistantId, event.deliveryId, true); + if (assistantId && event.deliveryId) { + void sendNotificationIntentAck(assistantId, event.deliveryId, true); } return; } @@ -61,8 +59,8 @@ export function useNotificationIntentSync( useConversationStore.getState().activeConversationId && window.location.pathname.startsWith("/assistant/conversations/") ) { - if (ackAssistantId && event.deliveryId) { - void sendNotificationIntentAck(ackAssistantId, event.deliveryId, true); + if (assistantId && event.deliveryId) { + void sendNotificationIntentAck(assistantId, event.deliveryId, true); } return; } @@ -73,7 +71,7 @@ export function useNotificationIntentSync( sourceEventName: event.sourceEventName, deliveryId: event.deliveryId, deepLinkMetadata: event.deepLinkMetadata, - assistantId: ackAssistantId ?? undefined, + assistantId: assistantId ?? undefined, }); }); } From 1e8a986a7887a1d2a5b3f487f755b664fbe6d4d0 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 29 May 2026 21:55:50 +0000 Subject: [PATCH 5/5] docs(web): note why useDocumentEditorSync takes no assistantId Co-Authored-By: ashlee@vellum.ai --- apps/web/src/hooks/use-document-editor-sync.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/apps/web/src/hooks/use-document-editor-sync.ts b/apps/web/src/hooks/use-document-editor-sync.ts index 3601df1c209..3ab779ced8f 100644 --- a/apps/web/src/hooks/use-document-editor-sync.ts +++ b/apps/web/src/hooks/use-document-editor-sync.ts @@ -16,6 +16,9 @@ import { useViewerStore } from "@/stores/viewer-store"; /** * Subscribes to `document_editor_update` SSE events via the event bus * and forwards content updates to the viewer store. + * + * Unlike other bus subscribers, this takes no `assistantId` — the viewer + * store is global and document surface ids are globally unique. */ export function useDocumentEditorSync(): void { useBusSubscription("sse.event", (event) => {