diff --git a/apps/web/.cross-domain-allowlist.json b/apps/web/.cross-domain-allowlist.json index 0a7b8435079..10c337aa9b4 100644 --- a/apps/web/.cross-domain-allowlist.json +++ b/apps/web/.cross-domain-allowlist.json @@ -25,12 +25,6 @@ "src/domains/chat/hooks/use-voice-input.ts": [ "voice" ], - "src/domains/chat/utils/stream-handlers/metadata-handlers.test.ts": [ - "conversations" - ], - "src/domains/chat/utils/stream-handlers/metadata-handlers.ts": [ - "conversations" - ], "src/domains/conversations/use-attention-tracking.ts": [ "chat" ], diff --git a/apps/web/src/assistant/use-disk-pressure-monitor.ts b/apps/web/src/assistant/use-disk-pressure-monitor.ts index 535adcae885..92fc5955ac8 100644 --- a/apps/web/src/assistant/use-disk-pressure-monitor.ts +++ b/apps/web/src/assistant/use-disk-pressure-monitor.ts @@ -12,6 +12,7 @@ import { getDiskPressureMonitorMode, type DiskPressureMonitorMode, } from "@/assistant/disk-pressure"; +import { useBusSubscription } from "@/hooks/use-bus-subscription"; import { useEventBusStore } from "@/stores/event-bus-store"; export interface UseDiskPressureMonitorOptions { @@ -204,6 +205,15 @@ export function useDiskPressureMonitor({ [assistantId, applyStatusForAssistant, clearStatus, enabled], ); + // React to daemon-pushed disk pressure events via the event bus. + // Complements the polling interval and resume-refresh above so + // status changes are reflected immediately without waiting for + // the next poll tick. + useBusSubscription("sse.event", (event) => { + if (event.type !== "disk_pressure_status_changed") return; + applyStatusEvent(event.status); + }); + const acknowledge = useCallback(async () => { const requestedAssistantId = assistantId; diff --git a/apps/web/src/domains/chat/chat-page.tsx b/apps/web/src/domains/chat/chat-page.tsx index 89c9dfdaf17..fec18e95d91 100644 --- a/apps/web/src/domains/chat/chat-page.tsx +++ b/apps/web/src/domains/chat/chat-page.tsx @@ -646,9 +646,6 @@ export function ChatPage() { setContextWindowUsage, scheduleConversationListRefetch, setCompactionCircuitOpenUntil, - applyDiskPressureStatusEvent: diskPressure.applyStatusEvent, - refreshAssistantIdentity, - invalidateAvatar, dispatchSyncChanged, pendingQueuedMessageIdsRef, requestIdToMessageIdRef, diff --git a/apps/web/src/domains/chat/hooks/use-stream-event-handler-guard.test.tsx b/apps/web/src/domains/chat/hooks/use-stream-event-handler-guard.test.tsx index 7225787888f..8a8448515b4 100644 --- a/apps/web/src/domains/chat/hooks/use-stream-event-handler-guard.test.tsx +++ b/apps/web/src/domains/chat/hooks/use-stream-event-handler-guard.test.tsx @@ -28,18 +28,6 @@ mock.module( }, }), ); -mock.module( - "@/domains/chat/utils/stream-handlers/home-handlers", - () => ({ - handleHomeFeedUpdated: () => { - handlerCalls.push({ kind: "home_feed_updated" }); - }, - handleRelationshipStateUpdated: () => { - handlerCalls.push({ kind: "relationship_state_updated" }); - }, - }), -); - const { useStreamEventHandler } = await import( "@/domains/chat/hooks/use-stream-event-handler" ); @@ -116,9 +104,6 @@ function renderHandler( setContextWindowUsage: () => {}, scheduleConversationListRefetch: () => {}, setCompactionCircuitOpenUntil: () => {}, - applyDiskPressureStatusEvent: () => {}, - refreshAssistantIdentity: async () => {}, - invalidateAvatar: () => {}, dispatchSyncChanged: () => {}, } as never), { wrapper }, @@ -218,7 +203,7 @@ describe("handleStreamEvent — defense-in-depth conversation routing guard", () expect(true).toBe(true); }); - test("forwards a global event (home_feed_updated) regardless of conversation context", () => { + test("no-ops a cross-domain event (home_feed_updated) handled by bus subscribers", () => { const refs = noopRefs(); const { handleStreamEvent } = renderHandler(refs, { streamConversationId: null, @@ -230,7 +215,10 @@ describe("handleStreamEvent — defense-in-depth conversation routing guard", () } as unknown as AssistantEvent, 0, ); - expect(handlerCalls).toContainEqual({ kind: "home_feed_updated" }); + // home_feed_updated is now handled by useAssistantResourceSync (bus + // subscriber), not the monolithic handler. The switch case is a no-op. + const homeFeedCalls = handlerCalls.filter((c) => c.kind === "home_feed_updated"); + expect(homeFeedCalls).toHaveLength(0); }); test("rejects events whose epoch is stale (regardless of conversation key)", () => { diff --git a/apps/web/src/domains/chat/hooks/use-stream-event-handler.ts b/apps/web/src/domains/chat/hooks/use-stream-event-handler.ts index c43a214e484..eb125068bfb 100644 --- a/apps/web/src/domains/chat/hooks/use-stream-event-handler.ts +++ b/apps/web/src/domains/chat/hooks/use-stream-event-handler.ts @@ -14,14 +14,9 @@ import type { DisplayMessage } from "@/domains/chat/utils/reconcile"; import { tailIsStreamingAssistant } from "@/domains/chat/hooks/stream-message-updaters"; import { useTurnStore } from "@/domains/chat/turn-store"; import { endTurn } from "@/domains/chat/turn-coordinator"; -import { useViewerStore } from "@/stores/viewer-store"; -import type { DiskPressureStatusEventPayload } from "@/assistant/use-disk-pressure-monitor"; + import { recordDiagnostic, summarizeAssistantEvent } from "@/lib/diagnostics"; import { isConversationScopedStreamEvent } from "@/domains/chat/utils/chat"; -import { - handleHomeFeedUpdated, - handleRelationshipStateUpdated, -} from "@/domains/chat/utils/stream-handlers/home-handlers"; import { handleOpenUrl, handleNavigateSettings, @@ -56,13 +51,8 @@ import { } from "@/domains/chat/utils/stream-handlers/tool-call-handlers"; import { handleUsageUpdate, - handleConversationTitleUpdated, - handleNotificationIntent, handleCompactionCircuitOpen, handleCompactionCircuitClosed, - handleDiskPressureStatusChanged, - handleIdentityChanged, - handleAvatarUpdated, handleTurnProfileAutoRouted, } from "@/domains/chat/utils/stream-handlers/metadata-handlers"; import { @@ -140,12 +130,7 @@ export interface UseStreamEventHandlerParams { // --- Compaction --- setCompactionCircuitOpenUntil: Dispatch>; - // --- External callbacks (stabilized via refs in the hook) --- - applyDiskPressureStatusEvent: ( - payload: DiskPressureStatusEventPayload, - ) => void; - refreshAssistantIdentity: (force?: boolean) => Promise; - invalidateAvatar: () => void; + // --- Sync router --- dispatchSyncChanged: (event: AssistantSyncChangedEvent) => void; // --- Queue management --- @@ -195,9 +180,6 @@ export function useStreamEventHandler( setContextWindowUsage, scheduleConversationListRefetch, setCompactionCircuitOpenUntil, - applyDiskPressureStatusEvent, - refreshAssistantIdentity, - invalidateAvatar, dispatchSyncChanged, pendingQueuedMessageIdsRef, requestIdToMessageIdRef, @@ -209,18 +191,6 @@ export function useStreamEventHandler( const toolCallIdCounterRef = useRef(0); const currentAssistantMessageIdRef = useRef(undefined); - // Stabilize external callbacks that may not be memoized upstream. - // Storing them in refs keeps handleStreamEvent's identity stable across - // renders while always calling the latest version of each callback. - // Reference: https://react.dev/reference/react/useCallback#preventing-an-effect-from-firing-too-often - const applyDiskPressureStatusEventRef = useRef(applyDiskPressureStatusEvent); - applyDiskPressureStatusEventRef.current = applyDiskPressureStatusEvent; - const refreshAssistantIdentityRef = useRef(refreshAssistantIdentity); - refreshAssistantIdentityRef.current = refreshAssistantIdentity; - const invalidateAvatarRef = useRef(invalidateAvatar); - invalidateAvatarRef.current = invalidateAvatar; - - // --- Main event handler --- const handleStreamEvent = useCallback( @@ -311,12 +281,6 @@ export function useStreamEventHandler( scheduleConversationListRefetch, queryClient, setCompactionCircuitOpenUntil, - applyDiskPressureStatusEvent: (...args) => - applyDiskPressureStatusEventRef.current(...args), - refreshAssistantIdentity: (...args) => - refreshAssistantIdentityRef.current(...args), - invalidateAvatar: (...args) => - invalidateAvatarRef.current(...args), pendingQueuedMessageIdsRef, requestIdToMessageIdRef, pendingLocalDeletionsRef, @@ -410,27 +374,14 @@ export function useStreamEventHandler( // to the Electron client and `conversation_list_invalidated` // is retired from the event types entirely. break; - case "conversation_title_updated": - handleConversationTitleUpdated(event, ctx); - break; - case "notification_intent": - handleNotificationIntent(event, ctx); - break; + case "compaction_circuit_open": handleCompactionCircuitOpen(event, ctx); break; case "compaction_circuit_closed": handleCompactionCircuitClosed(event, ctx); break; - case "disk_pressure_status_changed": - handleDiskPressureStatusChanged(event, ctx); - break; - case "identity_changed": - handleIdentityChanged(event, ctx); - break; - case "avatar_updated": - handleAvatarUpdated(event, ctx); - break; + case "turn_profile_auto_routed": handleTurnProfileAutoRouted(event, ctx); break; @@ -446,12 +397,7 @@ export function useStreamEventHandler( case "message_request_complete": handleMessageRequestComplete(event, ctx); break; - case "home_feed_updated": - handleHomeFeedUpdated(queryClient, event); - break; - case "relationship_state_updated": - handleRelationshipStateUpdated(queryClient, event); - break; + case "subagent_spawned": handleSubagentSpawned(event, ctx); break; @@ -464,21 +410,25 @@ export function useStreamEventHandler( case "sync_changed": dispatchSyncChanged(event); break; + + // Cross-domain events handled by bus subscribers mounted in + // RootLayout (useAssistantResourceSync, useConversationSync, + // useNotificationIntentSync, useDocumentEditorSync) or + // ChatPage-scoped hooks (useDiskPressureMonitor). The chat + // handler is intentionally a no-op for these. + case "home_feed_updated": + case "relationship_state_updated": + case "identity_changed": + case "avatar_updated": + case "disk_pressure_status_changed": + case "notification_intent": case "document_editor_update": - useViewerStore.getState().updateDocumentContent( - event.surfaceId, - event.markdown, - event.mode, - ); - break; + case "conversation_title_updated": case "document_comment_created": case "document_comment_resolved": case "document_comment_reopened": case "document_comment_deleted": case "interaction_resolved": - // Attention reconciliation lives in `useAttentionTracking`, which - // subscribes to the event bus directly. The chat-stream handler - // is intentionally a no-op here. break; case "unknown": break; @@ -497,9 +447,6 @@ export function useStreamEventHandler( // Stable deps listed for correctness — React guarantees identity // stability for state setters and refs, so these never trigger // re-creation of the callback. - // Note: applyDiskPressureStatusEvent, refreshAssistantIdentity, and - // invalidateAvatar are accessed via refs (stable identity) and are - // intentionally excluded from this dep array. dispatchSyncChanged, queryClient, streamEpochRef, diff --git a/apps/web/src/domains/chat/utils/stream-handlers/home-handlers.ts b/apps/web/src/domains/chat/utils/stream-handlers/home-handlers.ts deleted file mode 100644 index 9fecf1d0de5..00000000000 --- a/apps/web/src/domains/chat/utils/stream-handlers/home-handlers.ts +++ /dev/null @@ -1,20 +0,0 @@ -import type { QueryClient } from "@tanstack/react-query"; -import type { - HomeFeedUpdatedEvent, - RelationshipStateUpdatedEvent, -} from "@vellumai/assistant-api"; -import { HOME_FEED_QUERY_KEY_PREFIX } from "@/lib/sync/query-tags"; - -export function handleHomeFeedUpdated( - queryClient: QueryClient, - _event: HomeFeedUpdatedEvent, -): void { - queryClient.invalidateQueries({ queryKey: [HOME_FEED_QUERY_KEY_PREFIX] }); -} - -export function handleRelationshipStateUpdated( - queryClient: QueryClient, - _event: RelationshipStateUpdatedEvent, -): void { - queryClient.invalidateQueries({ queryKey: ["home-state"] }); -} diff --git a/apps/web/src/domains/chat/utils/stream-handlers/metadata-handlers.test.ts b/apps/web/src/domains/chat/utils/stream-handlers/metadata-handlers.test.ts index d11bf9b8fa1..dfcb95e2d12 100644 --- a/apps/web/src/domains/chat/utils/stream-handlers/metadata-handlers.test.ts +++ b/apps/web/src/domains/chat/utils/stream-handlers/metadata-handlers.test.ts @@ -1,16 +1,11 @@ import { describe, expect, it } from "bun:test"; import { makeCtx } from "@/domains/chat/utils/stream-handlers/test-helpers"; -import { conversationsQueryKey } from "@/domains/conversations/conversation-queries"; -import type { Conversation } from "@/types/conversation-types"; + import { handleUsageUpdate, - handleConversationTitleUpdated, handleCompactionCircuitOpen, handleCompactionCircuitClosed, - handleDiskPressureStatusChanged, - handleIdentityChanged, - handleAvatarUpdated, } from "@/domains/chat/utils/stream-handlers/metadata-handlers"; describe("handleUsageUpdate", () => { @@ -71,31 +66,6 @@ describe("handleUsageUpdate", () => { }); }); -describe("handleConversationTitleUpdated", () => { - it("patches conversation title in the conversations query cache", () => { - const ctx = makeCtx(); - ctx.queryClient.setQueryData( - conversationsQueryKey(ctx.assistantIdRef.current), - [{ conversationId: "conv-1", title: "Old Title" } as Conversation], - ); - - handleConversationTitleUpdated( - { - type: "conversation_title_updated", - conversationId: "conv-1", - title: "New Title", - }, - ctx, - ); - - const cached = ctx.queryClient.getQueryData( - conversationsQueryKey(ctx.assistantIdRef.current), - ); - const conv = cached?.find((c) => c.conversationId === "conv-1"); - expect(conv?.title).toBe("New Title"); - }); -}); - describe("handleCompactionCircuitOpen", () => { it("sets compaction circuit open until date", () => { const ctx = makeCtx(); @@ -123,29 +93,4 @@ describe("handleCompactionCircuitClosed", () => { }); }); -describe("handleDiskPressureStatusChanged", () => { - it("delegates to applyDiskPressureStatusEvent", () => { - const ctx = makeCtx(); - handleDiskPressureStatusChanged( - { type: "disk_pressure_status_changed", status: null }, - ctx, - ); - expect(ctx.applyDiskPressureStatusEvent).toHaveBeenCalledWith(null); - }); -}); -describe("handleIdentityChanged", () => { - it("calls refreshAssistantIdentity with force=true", () => { - const ctx = makeCtx(); - handleIdentityChanged({ type: "identity_changed" }, ctx); - expect(ctx.refreshAssistantIdentity).toHaveBeenCalledWith(true); - }); -}); - -describe("handleAvatarUpdated", () => { - it("calls invalidateAvatar", () => { - const ctx = makeCtx(); - handleAvatarUpdated({ type: "avatar_updated" }, ctx); - expect(ctx.invalidateAvatar).toHaveBeenCalled(); - }); -}); diff --git a/apps/web/src/domains/chat/utils/stream-handlers/metadata-handlers.ts b/apps/web/src/domains/chat/utils/stream-handlers/metadata-handlers.ts index b2127bdf0b3..2a6826fc0ce 100644 --- a/apps/web/src/domains/chat/utils/stream-handlers/metadata-handlers.ts +++ b/apps/web/src/domains/chat/utils/stream-handlers/metadata-handlers.ts @@ -1,26 +1,14 @@ import type { ContextWindowUsage } from "@/domains/chat/components/context-window-indicator"; import { saveContextWindowUsage } from "@/domains/chat/utils/context-window-storage"; -import { - extractConversationId, - postLocalNotification, - sendNotificationIntentAck, -} from "@/runtime/notifications"; -import { patchConversation } from "@/domains/conversations/conversation-queries"; import type { StreamHandlerContext } from "@/domains/chat/utils/stream-handlers/types"; import type { CompactionCircuitClosedEvent, CompactionCircuitOpenEvent, } from "@vellumai/assistant-api"; import type { - AvatarUpdatedEvent, - ConversationTitleUpdatedEvent, - DiskPressureStatusChangedEvent, - IdentityChangedEvent, - NotificationIntentEvent, TurnProfileAutoRoutedEvent, UsageUpdateEvent, } from "@/types/event-types"; -import { useConversationStore } from "@/stores/conversation-store"; export function handleUsageUpdate( event: UsageUpdateEvent, @@ -56,59 +44,6 @@ export function handleUsageUpdate( ctx.setContextWindowUsage(usage); } -export function handleConversationTitleUpdated( - event: ConversationTitleUpdatedEvent, - ctx: StreamHandlerContext, -): void { - // `patchConversation` looks up the Conversation entity by its - // `conversationId` field in the React Query cache. The event's - // `conversationId` (hydrated in `event-parser.ts` via - // `withParsedConversationId` or the envelope fallback in `stream.ts`) - // is the value to match against. - patchConversation( - ctx.queryClient, - ctx.assistantIdRef.current, - event.conversationId, - { title: event.title }, - ); -} - -export function handleNotificationIntent( - event: NotificationIntentEvent, - ctx: StreamHandlerContext, -): void { - const streamCtx = ctx.streamContextRef.current; - const ackAssistantId = streamCtx?.assistantId; - - if (event.targetGuardianPrincipalId) { - if (ackAssistantId && event.deliveryId) { - void sendNotificationIntentAck(ackAssistantId, event.deliveryId, true); - } - return; - } - - const metadataConversationId = extractConversationId(event.deepLinkMetadata); - if ( - metadataConversationId && - metadataConversationId === - useConversationStore.getState().activeConversationId - ) { - if (ackAssistantId && event.deliveryId) { - void sendNotificationIntentAck(ackAssistantId, event.deliveryId, true); - } - return; - } - - void postLocalNotification({ - title: event.title, - body: event.body, - sourceEventName: event.sourceEventName, - deliveryId: event.deliveryId, - deepLinkMetadata: event.deepLinkMetadata, - assistantId: ackAssistantId, - }); -} - export function handleCompactionCircuitOpen( event: CompactionCircuitOpenEvent, ctx: StreamHandlerContext, @@ -123,27 +58,6 @@ export function handleCompactionCircuitClosed( ctx.setCompactionCircuitOpenUntil(null); } -export function handleDiskPressureStatusChanged( - event: DiskPressureStatusChangedEvent, - ctx: StreamHandlerContext, -): void { - ctx.applyDiskPressureStatusEvent(event.status); -} - -export function handleIdentityChanged( - _event: IdentityChangedEvent, - ctx: StreamHandlerContext, -): void { - void ctx.refreshAssistantIdentity(true); -} - -export function handleAvatarUpdated( - _event: AvatarUpdatedEvent, - ctx: StreamHandlerContext, -): void { - ctx.invalidateAvatar(); -} - export function handleTurnProfileAutoRouted( event: TurnProfileAutoRoutedEvent, ctx: StreamHandlerContext, diff --git a/apps/web/src/domains/chat/utils/stream-handlers/test-helpers.ts b/apps/web/src/domains/chat/utils/stream-handlers/test-helpers.ts index 4fbdd508737..7cf6081cf3d 100644 --- a/apps/web/src/domains/chat/utils/stream-handlers/test-helpers.ts +++ b/apps/web/src/domains/chat/utils/stream-handlers/test-helpers.ts @@ -62,9 +62,6 @@ export function makeCtx( scheduleConversationListRefetch: mock(() => {}), queryClient: new QueryClient(), setCompactionCircuitOpenUntil: mock(() => {}), - applyDiskPressureStatusEvent: mock(() => {}), - refreshAssistantIdentity: mock(() => Promise.resolve()), - invalidateAvatar: mock(() => {}), pendingQueuedMessageIdsRef: { current: [] }, requestIdToMessageIdRef: { current: new Map() }, pendingLocalDeletionsRef: { current: new Set() }, diff --git a/apps/web/src/domains/chat/utils/stream-handlers/types.ts b/apps/web/src/domains/chat/utils/stream-handlers/types.ts index a2ff2e11f8e..fd204435252 100644 --- a/apps/web/src/domains/chat/utils/stream-handlers/types.ts +++ b/apps/web/src/domains/chat/utils/stream-handlers/types.ts @@ -8,7 +8,6 @@ import type { ContextWindowUsage } from "@/domains/chat/components/context-windo import type { DisplayMessage } from "@/domains/chat/utils/reconcile"; import type { TurnActions, TurnState } from "@/domains/chat/turn-store"; import type { EndTurnArgs } from "@/domains/chat/turn-coordinator"; -import type { DiskPressureStatusEventPayload } from "@/assistant/use-disk-pressure-monitor"; import type { ChatError, PendingQuestionState } from "@/domains/chat/types"; import type { ChatEventStream } from "@/lib/streaming/stream-transport"; @@ -85,13 +84,6 @@ export interface StreamHandlerContext { // --- Compaction --- setCompactionCircuitOpenUntil: Dispatch>; - // --- External callbacks --- - applyDiskPressureStatusEvent: ( - payload: DiskPressureStatusEventPayload, - ) => void; - refreshAssistantIdentity: (force?: boolean) => Promise; - invalidateAvatar: () => void; - // --- Queue management --- pendingQueuedMessageIdsRef: MutableRefObject; requestIdToMessageIdRef: MutableRefObject>; diff --git a/apps/web/src/domains/conversations/use-conversation-sync.test.tsx b/apps/web/src/domains/conversations/use-conversation-sync.test.tsx index d1fd847d5ee..0771cc1a2f0 100644 --- a/apps/web/src/domains/conversations/use-conversation-sync.test.tsx +++ b/apps/web/src/domains/conversations/use-conversation-sync.test.tsx @@ -6,6 +6,8 @@ import { cleanup, renderHook, waitFor } from "@testing-library/react"; import { conversationGroupsQueryKey, } from "@/domains/conversations/conversation-queries"; +import type { AssistantEvent } from "@/types/event-types"; +import type { Conversation } from "@/types/conversation-types"; import { conversationsQueryKey } from "@/lib/sync/query-tags"; import { SYNC_TAGS, type SyncChangedEvent } from "@/lib/sync/types"; import { @@ -291,4 +293,27 @@ describe("useConversationSync", () => { await Promise.resolve(); expect(spy).not.toHaveBeenCalled(); }); + + test("patches conversation title in cache on conversation_title_updated", async () => { + const queryClient = freshQueryClient(); + queryClient.setQueryData( + conversationsQueryKey("asst-1"), + [{ conversationId: "conv-1", title: "Old Title" } as Conversation], + ); + renderHook(() => useConversationSync("asst-1", true), { + wrapper: createWrapper(queryClient), + }); + useEventBusStore.getState().publish("sse.event", { + type: "conversation_title_updated", + conversationId: "conv-1", + title: "New Title", + } as unknown as AssistantEvent); + await waitFor(() => { + const cached = queryClient.getQueryData( + conversationsQueryKey("asst-1"), + ); + const conv = cached?.find((c) => c.conversationId === "conv-1"); + expect(conv?.title).toBe("New Title"); + }); + }); }); diff --git a/apps/web/src/domains/conversations/use-conversation-sync.ts b/apps/web/src/domains/conversations/use-conversation-sync.ts index 97b4ba91af2..f709e4c588b 100644 --- a/apps/web/src/domains/conversations/use-conversation-sync.ts +++ b/apps/web/src/domains/conversations/use-conversation-sync.ts @@ -1,10 +1,11 @@ /** * Domain-scoped bus consumer for conversation cache invalidation. * - * Routes `conversationsList` umbrella tags and per-conversation - * `conversation::metadata` tags into TanStack Query cache - * operations. Debounces list-level invalidations so rapid-fire - * sync_changed bursts collapse into a single refetch. + * Routes `conversationsList` umbrella tags, per-conversation + * `conversation::metadata` tags, and the `conversation_title_updated` + * event into TanStack Query cache operations. Debounces list-level + * invalidations so rapid-fire sync_changed bursts collapse into a + * single refetch. * * Also handles SSE reconnect (`sse.opened`) by scheduling a debounced * conversation list refetch to catch events missed during the gap. @@ -24,6 +25,7 @@ import { useQueryClient } from "@tanstack/react-query"; import { conversationGroupsQueryKey, + patchConversation, refreshConversationRow, } from "@/domains/conversations/conversation-queries"; import { useBusSubscription } from "@/hooks/use-bus-subscription"; @@ -65,13 +67,26 @@ export function useConversationSync( useBusSubscription("sse.event", (event) => { if (!assistantId || !isAssistantActive) return; - if (event.type !== "sync_changed") return; - handleConversationSyncTags( - event, - assistantId, - queryClient, - debounceTimerRef, - ); + + switch (event.type) { + case "sync_changed": + handleConversationSyncTags( + event, + assistantId, + queryClient, + debounceTimerRef, + ); + return; + + case "conversation_title_updated": + patchConversation( + queryClient, + assistantId, + event.conversationId, + { title: event.title }, + ); + return; + } }); useBusSubscription("sse.opened", ({ cause }) => { diff --git a/apps/web/src/domains/home/hooks/use-home-state-query.ts b/apps/web/src/domains/home/hooks/use-home-state-query.ts index 3293eb10731..766944a2fae 100644 --- a/apps/web/src/domains/home/hooks/use-home-state-query.ts +++ b/apps/web/src/domains/home/hooks/use-home-state-query.ts @@ -1,15 +1,10 @@ import { useCallback } from "react"; import { useQuery, useQueryClient } from "@tanstack/react-query"; +import { homeStateQueryKey } from "@/lib/sync/query-tags"; import { fetchRelationshipState } from "../api"; import type { RelationshipState } from "../types"; -const QUERY_KEY_PREFIX = "home-state" as const; - -function homeStateQueryKey(assistantId: string) { - return [QUERY_KEY_PREFIX, assistantId] as const; -} - /** * React Query hook for the assistant relationship state (tier, facts, * capabilities, conversation count, etc.). diff --git a/apps/web/src/hooks/use-assistant-resource-sync.test.tsx b/apps/web/src/hooks/use-assistant-resource-sync.test.tsx index 7eec9bdbee5..4458f7a09d3 100644 --- a/apps/web/src/hooks/use-assistant-resource-sync.test.tsx +++ b/apps/web/src/hooks/use-assistant-resource-sync.test.tsx @@ -12,6 +12,7 @@ import { assistantSoundsConfigQueryKey, avatarQueryKey, HOME_FEED_QUERY_KEY_PREFIX, + HOME_STATE_QUERY_KEY_PREFIX, } from "@/lib/sync/query-tags"; import { SYNC_TAGS, type SyncChangedEvent } from "@/lib/sync/types"; import { @@ -136,7 +137,7 @@ describe("useAssistantResourceSync", () => { }); }); - test("invalidates home-feed queries on home_feed_updated and relationship_state_updated", async () => { + test("invalidates home-feed query on home_feed_updated", async () => { const queryClient = freshQueryClient(); const spy = mock(() => Promise.resolve()); queryClient.invalidateQueries = spy as never; @@ -148,18 +149,67 @@ describe("useAssistantResourceSync", () => { updatedAt: "2026-05-21T00:00:00Z", newItemCount: 1, } as unknown as AssistantEvent); + await waitFor(() => { + expect(spy).toHaveBeenCalledWith({ + queryKey: [HOME_FEED_QUERY_KEY_PREFIX], + }); + }); + }); + + test("invalidates both home-feed and home-state on relationship_state_updated", async () => { + const queryClient = freshQueryClient(); + const calls: unknown[][] = []; + queryClient.invalidateQueries = ((arg: unknown) => { + calls.push([arg]); + return Promise.resolve(); + }) as never; + renderHook(() => useAssistantResourceSync("asst-1", true), { + wrapper: createWrapper(queryClient), + }); emit({ type: "relationship_state_updated", updatedAt: "2026-05-21T00:00:00Z", } as unknown as AssistantEvent); await waitFor(() => { - const homeCalls = (spy.mock.calls as unknown as Array<[unknown]>).filter( - (call) => { - const arg = call[0] as { queryKey: readonly unknown[] } | undefined; - return arg?.queryKey?.[0] === HOME_FEED_QUERY_KEY_PREFIX; - }, + const queryKeys = calls.map( + ([arg]) => (arg as { queryKey: readonly unknown[] }).queryKey, + ); + expect(queryKeys).toEqual( + expect.arrayContaining([ + [HOME_FEED_QUERY_KEY_PREFIX], + [HOME_STATE_QUERY_KEY_PREFIX], + ]) as never, ); - expect(homeCalls.length).toBe(2); + }); + }); + + test("invalidates identity query on identity_changed event", async () => { + const queryClient = freshQueryClient(); + const spy = mock(() => Promise.resolve()); + queryClient.invalidateQueries = spy as never; + renderHook(() => useAssistantResourceSync("asst-1", true), { + wrapper: createWrapper(queryClient), + }); + emit({ type: "identity_changed" } as unknown as AssistantEvent); + await waitFor(() => { + expect(spy).toHaveBeenCalledWith({ + queryKey: assistantIdentityQueryKey("asst-1"), + }); + }); + }); + + test("invalidates avatar query on avatar_updated event", async () => { + const queryClient = freshQueryClient(); + const spy = mock(() => Promise.resolve()); + queryClient.invalidateQueries = spy as never; + renderHook(() => useAssistantResourceSync("asst-1", true), { + wrapper: createWrapper(queryClient), + }); + emit({ type: "avatar_updated" } as unknown as AssistantEvent); + await waitFor(() => { + expect(spy).toHaveBeenCalledWith({ + queryKey: avatarQueryKey("asst-1"), + }); }); }); diff --git a/apps/web/src/hooks/use-assistant-resource-sync.ts b/apps/web/src/hooks/use-assistant-resource-sync.ts index c6aa68f5a2f..b4f69db67f4 100644 --- a/apps/web/src/hooks/use-assistant-resource-sync.ts +++ b/apps/web/src/hooks/use-assistant-resource-sync.ts @@ -1,11 +1,13 @@ /** * Bus consumer for assistant-level resource cache invalidation. * - * Routes avatar, identity, config, sounds, and schedules - * `sync_changed` tags — plus `home_feed_updated` and - * `relationship_state_updated` events — into TanStack Query cache - * invalidations. All operations are simple one-liner invalidations - * with no debouncing or per-row patching. + * Routes `sync_changed` tags (avatar, identity, config, sounds, + * schedules) and discrete SSE events (`home_feed_updated`, + * `relationship_state_updated`, `identity_changed`, `avatar_updated`) + * into TanStack Query cache invalidations. + * + * All operations are stateless one-liner invalidations with no + * debouncing or per-row patching. * * More complex sync domains (conversations, feature flags) own their * own hooks: @@ -29,6 +31,7 @@ import { assistantSoundsConfigQueryKey, avatarQueryKey, HOME_FEED_QUERY_KEY_PREFIX, + HOME_STATE_QUERY_KEY_PREFIX, } from "@/lib/sync/query-tags"; import { SYNC_TAGS } from "@/lib/sync/types"; @@ -36,10 +39,10 @@ import { SYNC_TAGS } from "@/lib/sync/types"; * Subscribes to assistant-resource sync events via the event bus. * * Handles `sync_changed` tags for avatar, identity, config, sounds, - * and schedules, plus `home_feed_updated` / `relationship_state_updated` - * event types. These are all stateless invalidations — no reconnect - * handling needed since the underlying `useQuery` hooks refetch - * automatically when the query becomes stale. + * and schedules, plus discrete event types for home feed/state changes + * and identity/avatar pushes. These are all stateless invalidations — + * no reconnect handling needed since the underlying `useQuery` hooks + * refetch automatically when the query becomes stale. */ export function useAssistantResourceSync( assistantId: string | null, @@ -90,10 +93,30 @@ export function useAssistantResourceSync( return; case "home_feed_updated": + void queryClient.invalidateQueries({ + queryKey: [HOME_FEED_QUERY_KEY_PREFIX], + }); + return; + case "relationship_state_updated": void queryClient.invalidateQueries({ queryKey: [HOME_FEED_QUERY_KEY_PREFIX], }); + void queryClient.invalidateQueries({ + queryKey: [HOME_STATE_QUERY_KEY_PREFIX], + }); + return; + + case "identity_changed": + void queryClient.invalidateQueries({ + queryKey: assistantIdentityQueryKey(assistantId), + }); + return; + + case "avatar_updated": + void queryClient.invalidateQueries({ + queryKey: avatarQueryKey(assistantId), + }); return; } }); diff --git a/apps/web/src/hooks/use-document-editor-sync.ts b/apps/web/src/hooks/use-document-editor-sync.ts new file mode 100644 index 00000000000..3ab779ced8f --- /dev/null +++ b/apps/web/src/hooks/use-document-editor-sync.ts @@ -0,0 +1,30 @@ +/** + * Bus consumer for `document_editor_update` SSE events. + * + * Applies streamed document content updates to the viewer store. + * The daemon sends incremental markdown content (append or replace + * mode) as the assistant edits a document surface. + * + * References: + * - EVENT_BUS.md — bus subscription contract + * - stores/viewer-store.ts — document editor state + */ + +import { useBusSubscription } from "@/hooks/use-bus-subscription"; +import { useViewerStore } from "@/stores/viewer-store"; + +/** + * Subscribes to `document_editor_update` SSE events via the event bus + * and forwards content updates to the viewer store. + * + * Unlike other bus subscribers, this takes no `assistantId` — the viewer + * store is global and document surface ids are globally unique. + */ +export function useDocumentEditorSync(): void { + useBusSubscription("sse.event", (event) => { + if (event.type !== "document_editor_update") return; + useViewerStore + .getState() + .updateDocumentContent(event.surfaceId, event.markdown, event.mode); + }); +} diff --git a/apps/web/src/hooks/use-notification-intent-sync.ts b/apps/web/src/hooks/use-notification-intent-sync.ts new file mode 100644 index 00000000000..b1e404f1908 --- /dev/null +++ b/apps/web/src/hooks/use-notification-intent-sync.ts @@ -0,0 +1,77 @@ +/** + * Bus consumer for `notification_intent` SSE events. + * + * Turns daemon-pushed notification intents into local browser or + * Capacitor notifications. Skips guardian-scoped notifications + * (the web client does not participate in guardian binding) and + * notifications targeting the conversation the user is actively + * viewing (verified by both store state and URL pathname, since + * `activeConversationId` persists across route changes). + * + * Acks every notification back to the daemon so delivery audit + * trails stay consistent with the macOS client. + * + * References: + * - EVENT_BUS.md — bus subscription contract + * - runtime/notifications.ts — notification scheduling and ack API + */ + +import { useBusSubscription } from "@/hooks/use-bus-subscription"; +import { + extractConversationId, + postLocalNotification, + sendNotificationIntentAck, +} from "@/runtime/notifications"; +import { useConversationStore } from "@/stores/conversation-store"; + +/** + * Subscribes to `notification_intent` SSE events via the event bus + * and schedules local notifications. + * + * @param assistantId — current assistant; `null` disables the subscription + */ +export function useNotificationIntentSync( + assistantId: string | null, +): void { + useBusSubscription("sse.event", (event) => { + if (event.type !== "notification_intent") return; + + // Guardian-scoped notifications are for devices bound to that + // guardian identity. The web/Capacitor client does not participate + // in guardian binding — skip to avoid leaking to unintended devices. + if (event.targetGuardianPrincipalId) { + if (assistantId && event.deliveryId) { + void sendNotificationIntentAck(assistantId, event.deliveryId, true); + } + return; + } + + // Suppress the banner when the user is already viewing the target + // conversation. `activeConversationId` is never cleared on navigation, + // so we also verify the URL matches the conversation route — otherwise + // a stale id would suppress notifications on home/settings/etc. + const metadataConversationId = extractConversationId( + event.deepLinkMetadata, + ); + if ( + metadataConversationId && + metadataConversationId === + useConversationStore.getState().activeConversationId && + window.location.pathname.startsWith("/assistant/conversations/") + ) { + if (assistantId && event.deliveryId) { + void sendNotificationIntentAck(assistantId, event.deliveryId, true); + } + return; + } + + void postLocalNotification({ + title: event.title, + body: event.body, + sourceEventName: event.sourceEventName, + deliveryId: event.deliveryId, + deepLinkMetadata: event.deepLinkMetadata, + assistantId: assistantId ?? undefined, + }); + }); +} diff --git a/apps/web/src/lib/sync/query-tags.ts b/apps/web/src/lib/sync/query-tags.ts index 56bce24be47..2cc4db70c62 100644 --- a/apps/web/src/lib/sync/query-tags.ts +++ b/apps/web/src/lib/sync/query-tags.ts @@ -66,6 +66,12 @@ export function homeFeedQueryKey(assistantId: string) { return [HOME_FEED_QUERY_KEY_PREFIX, assistantId] as const; } +export const HOME_STATE_QUERY_KEY_PREFIX = "home-state" as const; + +export function homeStateQueryKey(assistantId: string) { + return [HOME_STATE_QUERY_KEY_PREFIX, assistantId] as const; +} + export function invalidateAssistantConfigQueries( queryClient: QueryClient, assistantId: string | null | undefined, diff --git a/apps/web/src/root-layout.tsx b/apps/web/src/root-layout.tsx index bec1a0db1e4..a6c006810b1 100644 --- a/apps/web/src/root-layout.tsx +++ b/apps/web/src/root-layout.tsx @@ -11,6 +11,8 @@ import { import { useAuthStore } from "@/stores/auth-store"; import { useEnvironmentStore } from "@/stores/environment-store"; import { useAssistantResourceSync } from "@/hooks/use-assistant-resource-sync"; +import { useDocumentEditorSync } from "@/hooks/use-document-editor-sync"; +import { useNotificationIntentSync } from "@/hooks/use-notification-intent-sync"; import { useConversationSync } from "@/domains/conversations/use-conversation-sync"; import { useFeatureFlagBusSync } from "@/hooks/use-feature-flag-bus-sync"; import { useClientFeatureFlagSync } from "@/hooks/use-client-feature-flag-sync"; @@ -87,6 +89,8 @@ export function RootLayout() { useAssistantResourceSync(lifecycle.assistantId, isAssistantActive); useConversationSync(lifecycle.assistantId, isAssistantActive); useFeatureFlagBusSync(lifecycle.assistantId, isAssistantActive); + useNotificationIntentSync(lifecycle.assistantId); + useDocumentEditorSync(); useEventBusInit({ assistantId: lifecycle.assistantId,