diff --git a/apps/desktop/AGENTS.md b/apps/desktop/AGENTS.md index 3e897a4f5ea..aa7a8c5bbcd 100644 --- a/apps/desktop/AGENTS.md +++ b/apps/desktop/AGENTS.md @@ -1,4 +1,43 @@ # Implementation details -For Electron interprocess communnication, ALWAYS use trpc as defined in `src/lib/trpc` +For Electron interprocess communication, ALWAYS use trpc as defined in `src/lib/trpc` Please use alias as defined in `tsconfig.json` when possible -Prefer zustand for state management if it makes sense. Do not use effect unless absolutely necessary. \ No newline at end of file +Prefer zustand for state management if it makes sense. Do not use effect unless absolutely necessary. + +## tRPC Subscriptions (trpc-electron) + +**Important:** While standard tRPC recommends async generators for subscriptions, `trpc-electron` (used for Electron IPC) **only supports observables**. The library explicitly checks `isObservable(result)` and throws an error otherwise. Use the `observable` pattern: + +```typescript +// CORRECT for trpc-electron - use observable pattern +import { observable } from "@trpc/server/observable"; + +export const createMyRouter = () => { + return router({ + subscribe: publicProcedure.subscription(() => { + return observable((emit) => { + const handler = (data: MyData) => { + emit.next({ type: "my-event", data }); + }; + + myEmitter.on("my-event", handler); + + return () => { + myEmitter.off("my-event", handler); + }; + }); + }), + }); +}; + +// WRONG for trpc-electron - async generators don't work with IPC transport +export const createMyRouter = () => { + return router({ + subscribe: publicProcedure.subscription(async function* () { + // This will NOT work - the generator never gets invoked + while (true) { + yield await getNextEvent(); + } + }), + }); +}; +``` \ No newline at end of file diff --git a/apps/desktop/src/lib/trpc/routers/notifications.ts b/apps/desktop/src/lib/trpc/routers/notifications.ts index e658226e639..eb539b8122a 100644 --- a/apps/desktop/src/lib/trpc/routers/notifications.ts +++ b/apps/desktop/src/lib/trpc/routers/notifications.ts @@ -1,3 +1,4 @@ +import { observable } from "@trpc/server/observable"; import { type AgentCompleteEvent, type NotificationIds, @@ -15,36 +16,27 @@ type NotificationEvent = export const createNotificationsRouter = () => { return router({ - subscribe: publicProcedure.subscription(async function* () { - const queue: NotificationEvent[] = []; + subscribe: publicProcedure.subscription(() => { + return observable((emit) => { + const onComplete = (data: AgentCompleteEvent) => { + emit.next({ type: NOTIFICATION_EVENTS.AGENT_COMPLETE, data }); + }; - const onComplete = (data: AgentCompleteEvent) => { - queue.push({ type: NOTIFICATION_EVENTS.AGENT_COMPLETE, data }); - }; + const onFocusTab = (data: NotificationIds) => { + emit.next({ type: NOTIFICATION_EVENTS.FOCUS_TAB, data }); + }; - const onFocusTab = (data: NotificationIds) => { - queue.push({ type: NOTIFICATION_EVENTS.FOCUS_TAB, data }); - }; + notificationsEmitter.on(NOTIFICATION_EVENTS.AGENT_COMPLETE, onComplete); + notificationsEmitter.on(NOTIFICATION_EVENTS.FOCUS_TAB, onFocusTab); - notificationsEmitter.on(NOTIFICATION_EVENTS.AGENT_COMPLETE, onComplete); - notificationsEmitter.on(NOTIFICATION_EVENTS.FOCUS_TAB, onFocusTab); - - try { - while (true) { - const event = queue.shift(); - if (event) { - yield event; - } else { - await new Promise((resolve) => setTimeout(resolve, 100)); - } - } - } finally { - notificationsEmitter.off( - NOTIFICATION_EVENTS.AGENT_COMPLETE, - onComplete, - ); - notificationsEmitter.off(NOTIFICATION_EVENTS.FOCUS_TAB, onFocusTab); - } + return () => { + notificationsEmitter.off( + NOTIFICATION_EVENTS.AGENT_COMPLETE, + onComplete, + ); + notificationsEmitter.off(NOTIFICATION_EVENTS.FOCUS_TAB, onFocusTab); + }; + }); }), }); }; diff --git a/apps/desktop/src/renderer/stores/tabs/useAgentHookListener.ts b/apps/desktop/src/renderer/stores/tabs/useAgentHookListener.ts index 3863f189815..0b158830c49 100644 --- a/apps/desktop/src/renderer/stores/tabs/useAgentHookListener.ts +++ b/apps/desktop/src/renderer/stores/tabs/useAgentHookListener.ts @@ -1,3 +1,4 @@ +import { useRef } from "react"; import { trpc } from "renderer/lib/trpc"; import { useSetActiveWorkspace } from "renderer/react-query/workspaces/useSetActiveWorkspace"; import { NOTIFICATION_EVENTS } from "shared/constants"; @@ -13,6 +14,10 @@ export function useAgentHookListener() { const setActiveWorkspace = useSetActiveWorkspace(); const { data: activeWorkspace } = trpc.workspaces.getActive.useQuery(); + // Use ref to avoid stale closure in subscription callback + const activeWorkspaceRef = useRef(activeWorkspace); + activeWorkspaceRef.current = activeWorkspace; + trpc.notifications.subscribe.useSubscription(undefined, { onData: (event) => { if (!event.data) return; @@ -26,17 +31,16 @@ export function useAgentHookListener() { if (event.type === NOTIFICATION_EVENTS.AGENT_COMPLETE) { if (!paneId) return; - // Only show red dot if not already viewing this pane const activeTabId = state.activeTabIds[workspaceId]; const focusedPaneId = activeTabId && state.focusedPaneIds[activeTabId]; const isAlreadyActive = - activeWorkspace?.id === workspaceId && focusedPaneId === paneId; + activeWorkspaceRef.current?.id === workspaceId && + focusedPaneId === paneId; if (!isAlreadyActive) { state.setNeedsAttention(paneId, true); } } else if (event.type === NOTIFICATION_EVENTS.FOCUS_TAB) { - // Switch to workspace view if not already there const appState = useAppStore.getState(); if (appState.currentView !== "workspace") { appState.setView("workspace");