Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions apps/web/.cross-domain-allowlist.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,6 @@
"src/domains/chat/hooks/use-voice-input.ts": [
"voice"
],
"src/domains/chat/utils/stream-handlers/metadata-handlers.test.ts": [
"conversations"
],
"src/domains/chat/utils/stream-handlers/metadata-handlers.ts": [
"conversations"
],
"src/domains/conversations/use-attention-tracking.ts": [
"chat"
],
Expand Down
10 changes: 10 additions & 0 deletions apps/web/src/assistant/use-disk-pressure-monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
getDiskPressureMonitorMode,
type DiskPressureMonitorMode,
} from "@/assistant/disk-pressure";
import { useBusSubscription } from "@/hooks/use-bus-subscription";
import { useEventBusStore } from "@/stores/event-bus-store";

export interface UseDiskPressureMonitorOptions {
Expand Down Expand Up @@ -204,6 +205,15 @@ export function useDiskPressureMonitor({
[assistantId, applyStatusForAssistant, clearStatus, enabled],
);

// React to daemon-pushed disk pressure events via the event bus.
// Complements the polling interval and resume-refresh above so
// status changes are reflected immediately without waiting for
// the next poll tick.
useBusSubscription("sse.event", (event) => {
if (event.type !== "disk_pressure_status_changed") return;
applyStatusEvent(event.status);
});

const acknowledge = useCallback(async () => {
const requestedAssistantId = assistantId;

Expand Down
3 changes: 0 additions & 3 deletions apps/web/src/domains/chat/chat-page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -646,9 +646,6 @@ export function ChatPage() {
setContextWindowUsage,
scheduleConversationListRefetch,
setCompactionCircuitOpenUntil,
applyDiskPressureStatusEvent: diskPressure.applyStatusEvent,
refreshAssistantIdentity,
invalidateAvatar,
dispatchSyncChanged,
pendingQueuedMessageIdsRef,
requestIdToMessageIdRef,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,6 @@ mock.module(
},
}),
);
mock.module(
"@/domains/chat/utils/stream-handlers/home-handlers",
() => ({
handleHomeFeedUpdated: () => {
handlerCalls.push({ kind: "home_feed_updated" });
},
handleRelationshipStateUpdated: () => {
handlerCalls.push({ kind: "relationship_state_updated" });
},
}),
);

const { useStreamEventHandler } = await import(
"@/domains/chat/hooks/use-stream-event-handler"
);
Expand Down Expand Up @@ -116,9 +104,6 @@ function renderHandler(
setContextWindowUsage: () => {},
scheduleConversationListRefetch: () => {},
setCompactionCircuitOpenUntil: () => {},
applyDiskPressureStatusEvent: () => {},
refreshAssistantIdentity: async () => {},
invalidateAvatar: () => {},
dispatchSyncChanged: () => {},
} as never),
{ wrapper },
Expand Down Expand Up @@ -218,7 +203,7 @@ describe("handleStreamEvent — defense-in-depth conversation routing guard", ()
expect(true).toBe(true);
});

test("forwards a global event (home_feed_updated) regardless of conversation context", () => {
test("no-ops a cross-domain event (home_feed_updated) handled by bus subscribers", () => {
const refs = noopRefs();
const { handleStreamEvent } = renderHandler(refs, {
streamConversationId: null,
Expand All @@ -230,7 +215,10 @@ describe("handleStreamEvent — defense-in-depth conversation routing guard", ()
} as unknown as AssistantEvent,
0,
);
expect(handlerCalls).toContainEqual({ kind: "home_feed_updated" });
// home_feed_updated is now handled by useAssistantResourceSync (bus
// subscriber), not the monolithic handler. The switch case is a no-op.
const homeFeedCalls = handlerCalls.filter((c) => c.kind === "home_feed_updated");
expect(homeFeedCalls).toHaveLength(0);
});

test("rejects events whose epoch is stale (regardless of conversation key)", () => {
Expand Down
89 changes: 18 additions & 71 deletions apps/web/src/domains/chat/hooks/use-stream-event-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,9 @@ import type { DisplayMessage } from "@/domains/chat/utils/reconcile";
import { tailIsStreamingAssistant } from "@/domains/chat/hooks/stream-message-updaters";
import { useTurnStore } from "@/domains/chat/turn-store";
import { endTurn } from "@/domains/chat/turn-coordinator";
import { useViewerStore } from "@/stores/viewer-store";
import type { DiskPressureStatusEventPayload } from "@/assistant/use-disk-pressure-monitor";

import { recordDiagnostic, summarizeAssistantEvent } from "@/lib/diagnostics";
import { isConversationScopedStreamEvent } from "@/domains/chat/utils/chat";
import {
handleHomeFeedUpdated,
handleRelationshipStateUpdated,
} from "@/domains/chat/utils/stream-handlers/home-handlers";
import {
handleOpenUrl,
handleNavigateSettings,
Expand Down Expand Up @@ -56,13 +51,8 @@ import {
} from "@/domains/chat/utils/stream-handlers/tool-call-handlers";
import {
handleUsageUpdate,
handleConversationTitleUpdated,
handleNotificationIntent,
handleCompactionCircuitOpen,
handleCompactionCircuitClosed,
handleDiskPressureStatusChanged,
handleIdentityChanged,
handleAvatarUpdated,
handleTurnProfileAutoRouted,
} from "@/domains/chat/utils/stream-handlers/metadata-handlers";
import {
Expand Down Expand Up @@ -140,12 +130,7 @@ export interface UseStreamEventHandlerParams {
// --- Compaction ---
setCompactionCircuitOpenUntil: Dispatch<SetStateAction<Date | null>>;

// --- External callbacks (stabilized via refs in the hook) ---
applyDiskPressureStatusEvent: (
payload: DiskPressureStatusEventPayload,
) => void;
refreshAssistantIdentity: (force?: boolean) => Promise<void>;
invalidateAvatar: () => void;
// --- Sync router ---
dispatchSyncChanged: (event: AssistantSyncChangedEvent) => void;

// --- Queue management ---
Expand Down Expand Up @@ -195,9 +180,6 @@ export function useStreamEventHandler(
setContextWindowUsage,
scheduleConversationListRefetch,
setCompactionCircuitOpenUntil,
applyDiskPressureStatusEvent,
refreshAssistantIdentity,
invalidateAvatar,
dispatchSyncChanged,
pendingQueuedMessageIdsRef,
requestIdToMessageIdRef,
Expand All @@ -209,18 +191,6 @@ export function useStreamEventHandler(
const toolCallIdCounterRef = useRef(0);
const currentAssistantMessageIdRef = useRef<string | undefined>(undefined);

// Stabilize external callbacks that may not be memoized upstream.
// Storing them in refs keeps handleStreamEvent's identity stable across
// renders while always calling the latest version of each callback.
// Reference: https://react.dev/reference/react/useCallback#preventing-an-effect-from-firing-too-often
const applyDiskPressureStatusEventRef = useRef(applyDiskPressureStatusEvent);
applyDiskPressureStatusEventRef.current = applyDiskPressureStatusEvent;
const refreshAssistantIdentityRef = useRef(refreshAssistantIdentity);
refreshAssistantIdentityRef.current = refreshAssistantIdentity;
const invalidateAvatarRef = useRef(invalidateAvatar);
invalidateAvatarRef.current = invalidateAvatar;


// --- Main event handler ---

const handleStreamEvent = useCallback(
Expand Down Expand Up @@ -311,12 +281,6 @@ export function useStreamEventHandler(
scheduleConversationListRefetch,
queryClient,
setCompactionCircuitOpenUntil,
applyDiskPressureStatusEvent: (...args) =>
applyDiskPressureStatusEventRef.current(...args),
refreshAssistantIdentity: (...args) =>
refreshAssistantIdentityRef.current(...args),
invalidateAvatar: (...args) =>
invalidateAvatarRef.current(...args),
pendingQueuedMessageIdsRef,
requestIdToMessageIdRef,
pendingLocalDeletionsRef,
Expand Down Expand Up @@ -410,27 +374,14 @@ export function useStreamEventHandler(
// to the Electron client and `conversation_list_invalidated`
// is retired from the event types entirely.
break;
case "conversation_title_updated":
handleConversationTitleUpdated(event, ctx);
break;
case "notification_intent":
handleNotificationIntent(event, ctx);
break;

case "compaction_circuit_open":
handleCompactionCircuitOpen(event, ctx);
break;
case "compaction_circuit_closed":
handleCompactionCircuitClosed(event, ctx);
break;
case "disk_pressure_status_changed":
handleDiskPressureStatusChanged(event, ctx);
break;
case "identity_changed":
handleIdentityChanged(event, ctx);
break;
case "avatar_updated":
handleAvatarUpdated(event, ctx);
break;

case "turn_profile_auto_routed":
handleTurnProfileAutoRouted(event, ctx);
break;
Expand All @@ -446,12 +397,7 @@ export function useStreamEventHandler(
case "message_request_complete":
handleMessageRequestComplete(event, ctx);
break;
case "home_feed_updated":
handleHomeFeedUpdated(queryClient, event);
break;
case "relationship_state_updated":
handleRelationshipStateUpdated(queryClient, event);
break;

case "subagent_spawned":
handleSubagentSpawned(event, ctx);
break;
Expand All @@ -464,21 +410,25 @@ export function useStreamEventHandler(
case "sync_changed":
dispatchSyncChanged(event);
break;

// Cross-domain events handled by bus subscribers mounted in
// RootLayout (useAssistantResourceSync, useConversationSync,
// useNotificationIntentSync, useDocumentEditorSync) or
// ChatPage-scoped hooks (useDiskPressureMonitor). The chat
// handler is intentionally a no-op for these.
case "home_feed_updated":
case "relationship_state_updated":
case "identity_changed":
case "avatar_updated":
case "disk_pressure_status_changed":
case "notification_intent":
case "document_editor_update":
useViewerStore.getState().updateDocumentContent(
event.surfaceId,
event.markdown,
event.mode,
);
break;
case "conversation_title_updated":
case "document_comment_created":
case "document_comment_resolved":
case "document_comment_reopened":
case "document_comment_deleted":
case "interaction_resolved":
// Attention reconciliation lives in `useAttentionTracking`, which
// subscribes to the event bus directly. The chat-stream handler
// is intentionally a no-op here.
break;
case "unknown":
break;
Expand All @@ -497,9 +447,6 @@ export function useStreamEventHandler(
// Stable deps listed for correctness — React guarantees identity
// stability for state setters and refs, so these never trigger
// re-creation of the callback.
// Note: applyDiskPressureStatusEvent, refreshAssistantIdentity, and
// invalidateAvatar are accessed via refs (stable identity) and are
// intentionally excluded from this dep array.
dispatchSyncChanged,
queryClient,
streamEpochRef,
Expand Down
20 changes: 0 additions & 20 deletions apps/web/src/domains/chat/utils/stream-handlers/home-handlers.ts

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
import { describe, expect, it } from "bun:test";

import { makeCtx } from "@/domains/chat/utils/stream-handlers/test-helpers";
import { conversationsQueryKey } from "@/domains/conversations/conversation-queries";
import type { Conversation } from "@/types/conversation-types";

import {
handleUsageUpdate,
handleConversationTitleUpdated,
handleCompactionCircuitOpen,
handleCompactionCircuitClosed,
handleDiskPressureStatusChanged,
handleIdentityChanged,
handleAvatarUpdated,
} from "@/domains/chat/utils/stream-handlers/metadata-handlers";

describe("handleUsageUpdate", () => {
Expand Down Expand Up @@ -71,31 +66,6 @@ describe("handleUsageUpdate", () => {
});
});

describe("handleConversationTitleUpdated", () => {
it("patches conversation title in the conversations query cache", () => {
const ctx = makeCtx();
ctx.queryClient.setQueryData<Conversation[]>(
conversationsQueryKey(ctx.assistantIdRef.current),
[{ conversationId: "conv-1", title: "Old Title" } as Conversation],
);

handleConversationTitleUpdated(
{
type: "conversation_title_updated",
conversationId: "conv-1",
title: "New Title",
},
ctx,
);

const cached = ctx.queryClient.getQueryData<Conversation[]>(
conversationsQueryKey(ctx.assistantIdRef.current),
);
const conv = cached?.find((c) => c.conversationId === "conv-1");
expect(conv?.title).toBe("New Title");
});
});

describe("handleCompactionCircuitOpen", () => {
it("sets compaction circuit open until date", () => {
const ctx = makeCtx();
Expand Down Expand Up @@ -123,29 +93,4 @@ describe("handleCompactionCircuitClosed", () => {
});
});

describe("handleDiskPressureStatusChanged", () => {
it("delegates to applyDiskPressureStatusEvent", () => {
const ctx = makeCtx();
handleDiskPressureStatusChanged(
{ type: "disk_pressure_status_changed", status: null },
ctx,
);
expect(ctx.applyDiskPressureStatusEvent).toHaveBeenCalledWith(null);
});
});

describe("handleIdentityChanged", () => {
it("calls refreshAssistantIdentity with force=true", () => {
const ctx = makeCtx();
handleIdentityChanged({ type: "identity_changed" }, ctx);
expect(ctx.refreshAssistantIdentity).toHaveBeenCalledWith(true);
});
});

describe("handleAvatarUpdated", () => {
it("calls invalidateAvatar", () => {
const ctx = makeCtx();
handleAvatarUpdated({ type: "avatar_updated" }, ctx);
expect(ctx.invalidateAvatar).toHaveBeenCalled();
});
});
Loading