diff --git a/assistant/src/__tests__/conversation-confirmation-signals.test.ts b/assistant/src/__tests__/conversation-confirmation-signals.test.ts index 5d80952357f..220a010fc83 100644 --- a/assistant/src/__tests__/conversation-confirmation-signals.test.ts +++ b/assistant/src/__tests__/conversation-confirmation-signals.test.ts @@ -633,4 +633,83 @@ describe("restoreBrowserProxyAvailability", () => { // browser proxy. expect(bashProxy.isAvailable()).toBe(false); }); + + test("uses hostBrowserSenderOverride when set so drain-queue restores preserve the registry-routed sender", () => { + // Regression (PR #24129 cycle 2): the queue-drain path calls + // `restoreBrowserProxyAvailability()` on dequeue, which used to pass + // `this.sendToClient` (the SSE hub emitter) to the proxy, clobbering the + // chrome-extension registry-routed sender established by the POST + // /messages handler. The override field lets the HTTP handler pin the + // registry-routed sender so the drain path preserves it. + const sseHub: ServerMessage[] = []; + const registry: ServerMessage[] = []; + const conversation = makeConversation((msg) => sseHub.push(msg)); + const browserProxy = new HostBrowserProxy(() => {}); + conversation.setHostBrowserProxy(browserProxy); + + // Simulate updateClient setting sendToClient to the SSE hub and + // marking the conversation as client-less (chrome-extension is + // non-interactive). + conversation.updateClient((msg) => sseHub.push(msg), true); + expect(browserProxy.isAvailable()).toBe(false); + + // The HTTP handler stashes the registry-routed sender as the override. + const registrySender = (msg: ServerMessage) => registry.push(msg); + conversation.hostBrowserSenderOverride = registrySender; + + // Drain-queue path calls restoreBrowserProxyAvailability — it must now + // prefer the override over sendToClient. + conversation.restoreBrowserProxyAvailability(); + expect(browserProxy.isAvailable()).toBe(true); + + // Send a frame through the proxy and verify it flows through the + // registry sender, not the SSE hub. + const internalSend = ( + browserProxy as unknown as { + sendToClient: (msg: ServerMessage) => void; + } + ).sendToClient; + const probe: ServerMessage = { + type: "host_browser_cancel", + requestId: "probe-1", + } as ServerMessage; + internalSend(probe); + expect(registry).toHaveLength(1); + expect(sseHub.some((m) => m === probe)).toBe(false); + }); + + test("falls back to sendToClient when hostBrowserSenderOverride is cleared", () => { + // When a non-chrome-extension turn takes over, the HTTP handler clears + // the override and restoreBrowserProxyAvailability must fall back to + // sendToClient (the SSE hub), otherwise macOS turns would route their + // host_browser frames through the stale chrome-extension registry. + const sseHub: ServerMessage[] = []; + const conversation = makeConversation((msg) => sseHub.push(msg)); + const browserProxy = new HostBrowserProxy(() => {}); + conversation.setHostBrowserProxy(browserProxy); + + // First the chrome-extension path pins the override. + const registry: ServerMessage[] = []; + conversation.hostBrowserSenderOverride = (msg) => registry.push(msg); + conversation.updateClient((msg) => sseHub.push(msg), true); + conversation.restoreBrowserProxyAvailability(); + + // Then a macOS handoff clears the override. + conversation.hostBrowserSenderOverride = undefined; + conversation.updateClient((msg) => sseHub.push(msg), false); + conversation.restoreBrowserProxyAvailability(); + + const internalSend = ( + browserProxy as unknown as { + sendToClient: (msg: ServerMessage) => void; + } + ).sendToClient; + const probe: ServerMessage = { + type: "host_browser_cancel", + requestId: "probe-2", + } as ServerMessage; + internalSend(probe); + expect(sseHub).toContain(probe); + expect(registry).not.toContain(probe); + }); }); diff --git a/assistant/src/browser-extension-relay/server.ts b/assistant/src/browser-extension-relay/server.ts index b6d35c2aa11..5ccec8e0d83 100644 --- a/assistant/src/browser-extension-relay/server.ts +++ b/assistant/src/browser-extension-relay/server.ts @@ -28,6 +28,14 @@ interface PendingCommand { export interface BrowserRelayWebSocketData { wsType: "browser-relay"; connectionId: string; + /** + * Guardian identity derived from the JWT claims at WebSocket upgrade + * time. Used by the ChromeExtensionRegistry (runtime/) to route + * host_browser_request frames to the correct extension. Undefined when + * HTTP auth is disabled (dev bypass) or when the token's sub cannot be + * parsed into an actor principal. + */ + guardianId?: string; } export interface ExtensionRelayStatus { diff --git a/assistant/src/daemon/conversation.ts b/assistant/src/daemon/conversation.ts index 6f68b8baac9..fbe24f40cee 100644 --- a/assistant/src/daemon/conversation.ts +++ b/assistant/src/daemon/conversation.ts @@ -184,6 +184,19 @@ export class Conversation { /** @internal */ hostBrowserProxy?: HostBrowserProxy; /** @internal */ hostCuProxy?: HostCuProxy; /** @internal */ hostFileProxy?: HostFileProxy; + /** + * Optional override sender used by `restoreBrowserProxyAvailability` so + * non-SSE transports (e.g. chrome-extension, whose host_browser_request + * frames flow through the ChromeExtensionRegistry WebSocket rather than + * the SSE hub) can preserve their registry-routed sender across drain + * queue restores. When set, `restoreBrowserProxyAvailability()` uses this + * function instead of `sendToClient` so the drain-queue path doesn't + * clobber the chrome-extension sender with the SSE hub emitter. + * + * Populated by the POST /messages handler for chrome-extension turns and + * cleared when an unrelated interface takes over (see `updateClient`). + */ + /** @internal */ hostBrowserSenderOverride?: (msg: ServerMessage) => void; /** @internal */ cesClient?: CesClient; /** @internal */ readonly queue = new MessageQueue(); /** @internal */ currentActiveSurfaceId?: string; @@ -559,11 +572,21 @@ export class Conversation { * it available would be to flip `hasNoClient` false, which would * incorrectly enable host_bash/host_file/host_cu tool gating downstream. * + * When `hostBrowserSenderOverride` is set, that function is used as the + * sender instead of `sendToClient`. This is required for the + * chrome-extension interface whose host_browser frames route through the + * ChromeExtensionRegistry WebSocket rather than the SSE hub: if the + * queue-drain path called this helper with `sendToClient`, the + * registry-routed sender established at turn-start would be clobbered by + * the SSE hub emitter and host_browser_request frames would stop + * reaching the extension. + * * Callers must only invoke this when they know the current interface * supports host_browser (see `supportsHostProxy(id, "host_browser")`). */ restoreBrowserProxyAvailability(): void { - this.hostBrowserProxy?.updateSender(this.sendToClient, true); + const sender = this.hostBrowserSenderOverride ?? this.sendToClient; + this.hostBrowserProxy?.updateSender(sender, true); } setSubagentAllowedTools(tools: Set | undefined): void { diff --git a/assistant/src/runtime/__tests__/chrome-extension-registry.test.ts b/assistant/src/runtime/__tests__/chrome-extension-registry.test.ts new file mode 100644 index 00000000000..5a85071e417 --- /dev/null +++ b/assistant/src/runtime/__tests__/chrome-extension-registry.test.ts @@ -0,0 +1,162 @@ +import { beforeEach, describe, expect, test } from "bun:test"; + +import type { ServerMessage } from "../../daemon/message-protocol.js"; +import { + __resetChromeExtensionRegistryForTests, + type ChromeExtensionConnection, + ChromeExtensionRegistry, + getChromeExtensionRegistry, +} from "../chrome-extension-registry.js"; + +// Minimal structural stand-in for Bun's ServerWebSocket. Only the methods +// the registry touches (`send`, `close`) are modeled; the rest of the Bun +// ServerWebSocket API is out of scope for these unit tests. +interface FakeWs { + send: (data: string) => number; + close: (code?: number, reason?: string) => void; + sent: string[]; + closed: { code?: number; reason?: string }[]; + sendShouldThrow?: boolean; +} + +function makeFakeWs(): FakeWs { + const sent: string[] = []; + const closed: { code?: number; reason?: string }[] = []; + const ws: FakeWs = { + sent, + closed, + send(data: string) { + if (ws.sendShouldThrow) { + throw new Error("simulated ws.send failure"); + } + sent.push(data); + return data.length; + }, + close(code?: number, reason?: string) { + closed.push({ code, reason }); + }, + }; + return ws; +} + +function makeConnection( + guardianId: string, + id?: string, +): { conn: ChromeExtensionConnection; fakeWs: FakeWs } { + const fakeWs = makeFakeWs(); + const conn: ChromeExtensionConnection = { + id: id ?? crypto.randomUUID(), + guardianId, + ws: fakeWs as unknown as ChromeExtensionConnection["ws"], + connectedAt: Date.now(), + }; + return { conn, fakeWs }; +} + +describe("ChromeExtensionRegistry", () => { + beforeEach(() => { + __resetChromeExtensionRegistryForTests(); + }); + + test("register stores the connection under the guardianId", () => { + const registry = new ChromeExtensionRegistry(); + const { conn } = makeConnection("guardian-alpha"); + registry.register(conn); + expect(registry.get("guardian-alpha")).toBe(conn); + }); + + test("unregister removes the connection", () => { + const registry = new ChromeExtensionRegistry(); + const { conn } = makeConnection("guardian-alpha"); + registry.register(conn); + registry.unregister(conn.id); + expect(registry.get("guardian-alpha")).toBeUndefined(); + }); + + test("unregister is a no-op when the connectionId is unknown", () => { + const registry = new ChromeExtensionRegistry(); + // Should not throw even though nothing is registered. + expect(() => registry.unregister("unknown-connection")).not.toThrow(); + }); + + test("registering a second connection for the same guardianId closes the prior one", () => { + const registry = new ChromeExtensionRegistry(); + const { conn: conn1, fakeWs: fakeWs1 } = makeConnection( + "guardian-alpha", + "conn-1", + ); + const { conn: conn2 } = makeConnection("guardian-alpha", "conn-2"); + registry.register(conn1); + registry.register(conn2); + // Prior connection should have been closed with code 1000. + expect(fakeWs1.closed).toHaveLength(1); + expect(fakeWs1.closed[0].code).toBe(1000); + // Registry should hold the new connection. + expect(registry.get("guardian-alpha")).toBe(conn2); + }); + + test("registering the same connection id twice is idempotent and does not close itself", () => { + const registry = new ChromeExtensionRegistry(); + const { conn, fakeWs } = makeConnection("guardian-alpha", "conn-1"); + registry.register(conn); + registry.register(conn); + expect(fakeWs.closed).toHaveLength(0); + expect(registry.get("guardian-alpha")).toBe(conn); + }); + + test("send returns false when no connection exists for the guardian", () => { + const registry = new ChromeExtensionRegistry(); + const msg: ServerMessage = { + type: "host_browser_cancel", + requestId: "req-1", + } as ServerMessage; + expect(registry.send("missing-guardian", msg)).toBe(false); + }); + + test("send returns true and forwards the JSON-serialized message when a connection exists", () => { + const registry = new ChromeExtensionRegistry(); + const { conn, fakeWs } = makeConnection("guardian-alpha"); + registry.register(conn); + const msg: ServerMessage = { + type: "host_browser_cancel", + requestId: "req-1", + } as ServerMessage; + const ok = registry.send("guardian-alpha", msg); + expect(ok).toBe(true); + expect(fakeWs.sent).toHaveLength(1); + const parsed = JSON.parse(fakeWs.sent[0]); + expect(parsed.type).toBe("host_browser_cancel"); + expect(parsed.requestId).toBe("req-1"); + }); + + test("send returns false when ws.send throws (best-effort delivery)", () => { + const registry = new ChromeExtensionRegistry(); + const { conn, fakeWs } = makeConnection("guardian-alpha"); + fakeWs.sendShouldThrow = true; + registry.register(conn); + const msg: ServerMessage = { + type: "host_browser_cancel", + requestId: "req-1", + } as ServerMessage; + expect(registry.send("guardian-alpha", msg)).toBe(false); + }); + + test("getChromeExtensionRegistry returns a module-level singleton", () => { + const first = getChromeExtensionRegistry(); + const second = getChromeExtensionRegistry(); + expect(first).toBe(second); + }); + + test("unregister after supersession does not remove the new connection", () => { + // When a new connection supersedes an older one, the close handler for + // the older socket will fire later and call unregister with the OLD id. + // That must not clobber the newer registration. + const registry = new ChromeExtensionRegistry(); + const { conn: old } = makeConnection("guardian-alpha", "old-id"); + const { conn: fresh } = makeConnection("guardian-alpha", "fresh-id"); + registry.register(old); + registry.register(fresh); + registry.unregister("old-id"); + expect(registry.get("guardian-alpha")).toBe(fresh); + }); +}); diff --git a/assistant/src/runtime/chrome-extension-registry.ts b/assistant/src/runtime/chrome-extension-registry.ts new file mode 100644 index 00000000000..bbc742a22fe --- /dev/null +++ b/assistant/src/runtime/chrome-extension-registry.ts @@ -0,0 +1,116 @@ +/** + * Registry mapping guardianId → active Chrome extension WebSocket connections. + * + * Populated by the `/v1/browser-relay` WebSocket upgrade handler when a + * chrome-extension client connects; drained on close. Used by + * conversation-routes.ts to route `host_browser_request` frames to the + * connected extension for the appropriate guardian. + * + * This is the chrome-extension counterpart to the SSE hub used by the macOS + * client for the same purpose. + */ + +import type { ServerWebSocket } from "bun"; + +import type { ServerMessage } from "../daemon/message-protocol.js"; +import { getLogger } from "../util/logger.js"; + +const log = getLogger("chrome-extension-registry"); + +export interface ChromeExtensionConnection { + /** Stable identifier for this WebSocket connection (used for unregister). */ + id: string; + /** Guardian identity this connection is authenticated as. */ + guardianId: string; + /** Underlying Bun WebSocket. */ + ws: ServerWebSocket; + /** Wall-clock timestamp (ms) when the connection was registered. */ + connectedAt: number; +} + +/** + * Module-level registry of active chrome-extension connections keyed by + * guardianId. There is at most one connection per guardian — reconnects + * supersede the prior entry by closing it first. + */ +export class ChromeExtensionRegistry { + private byGuardian = new Map(); + + /** + * Register a chrome-extension WebSocket for a guardian. If a prior + * connection already exists for the same guardianId, it is closed and + * replaced with the new one. + */ + register(conn: ChromeExtensionConnection): void { + const prior = this.byGuardian.get(conn.guardianId); + if (prior && prior.id !== conn.id) { + try { + prior.ws.close(1000, "superseded by new connection"); + } catch { + // Best-effort — the prior socket may already be closed. + } + } + this.byGuardian.set(conn.guardianId, conn); + log.info( + { guardianId: conn.guardianId, connectionId: conn.id }, + "chrome extension registered", + ); + } + + /** + * Remove the entry with the given connectionId. No-op if no connection + * with that id is currently registered — the entry may already have been + * superseded by a newer registration. + */ + unregister(connectionId: string): void { + for (const [key, conn] of this.byGuardian) { + if (conn.id === connectionId) { + this.byGuardian.delete(key); + log.info( + { guardianId: key, connectionId }, + "chrome extension unregistered", + ); + return; + } + } + } + + /** Return the active connection for a guardian, if any. */ + get(guardianId: string): ChromeExtensionConnection | undefined { + return this.byGuardian.get(guardianId); + } + + /** + * Send a ServerMessage to the chrome-extension connection for the given + * guardian. Returns `true` when a connection exists and the send + * succeeds; `false` when no connection is registered or when the + * underlying `ws.send` throws. + */ + send(guardianId: string, msg: ServerMessage): boolean { + const conn = this.byGuardian.get(guardianId); + if (!conn) return false; + try { + conn.ws.send(JSON.stringify(msg)); + return true; + } catch (err) { + log.warn({ guardianId, err }, "failed to send to chrome extension"); + return false; + } + } +} + +// ── Module-level singleton (same pattern as assistant-event-hub) ────────── +let instance: ChromeExtensionRegistry | null = null; + +export function getChromeExtensionRegistry(): ChromeExtensionRegistry { + if (!instance) instance = new ChromeExtensionRegistry(); + return instance; +} + +/** + * Test helper: reset the module-level singleton so each test starts with a + * fresh registry. Not exported from any public index — test-only. + */ +export function __resetChromeExtensionRegistryForTests(): void { + instance = null; +} diff --git a/assistant/src/runtime/http-server.ts b/assistant/src/runtime/http-server.ts index 46827db5ce9..8a0031d68d2 100644 --- a/assistant/src/runtime/http-server.ts +++ b/assistant/src/runtime/http-server.ts @@ -68,12 +68,14 @@ import { assistantEventHub } from "./assistant-event-hub.js"; import { DAEMON_INTERNAL_ASSISTANT_ID } from "./assistant-scope.js"; // Auth import { authenticateRequest } from "./auth/middleware.js"; +import { parseSub } from "./auth/subject.js"; import { mintDaemonDeliveryToken, mintUiPageToken, verifyToken, } from "./auth/token-service.js"; import { sweepFailedEvents } from "./channel-retry-sweep.js"; +import { getChromeExtensionRegistry } from "./chrome-extension-registry.js"; import { httpError } from "./http-errors.js"; import type { RouteDefinition } from "./http-router.js"; import { HttpRouter } from "./http-router.js"; @@ -333,6 +335,19 @@ export class RuntimeHttpServer { extensionRelayServer.handleOpen( ws as ServerWebSocket, ); + // When the JWT sub resolved to a guardian principal at upgrade + // time, also register this connection with the chrome-extension + // registry so host_browser_request frames can be routed to it. + // The legacy ExtensionCommand protocol handled by + // extensionRelayServer continues to work in parallel. + if (data.guardianId) { + getChromeExtensionRegistry().register({ + id: data.connectionId, + guardianId: data.guardianId, + ws, + connectedAt: Date.now(), + }); + } return; } const callSessionId = (data as RelayWebSocketData).callSessionId; @@ -372,6 +387,12 @@ export class RuntimeHttpServer { code, reason?.toString(), ); + // Always attempt to unregister — the registry uses connectionId + // as the key and no-ops if the entry is absent (e.g. when the + // connection was never registered because guardianId was + // undefined, or when it was superseded by a newer registration + // for the same guardian). + getChromeExtensionRegistry().unregister(data.connectionId); return; } const callSessionId = (data as RelayWebSocketData).callSessionId; @@ -635,6 +656,24 @@ export class RuntimeHttpServer { ); } + // When auth is enabled we parse the JWT sub to extract the actor + // principal ID, which we use as the guardianId key for the + // ChromeExtensionRegistry. When auth is disabled (dev bypass), + // guardianId remains undefined and the registration is skipped — + // host_browser_request routing requires an authenticated guardian. + // + // Gateway path: when the WebSocket upgrade is proxied through the + // gateway, the upstream token minted by `mintServiceToken()` has + // `sub=svc:gateway:self` with no actor principal id. In that case + // we fall back to an explicit `x-guardian-id` header / query param + // so the runtime can still register the connection under the real + // guardian. TODO(gateway-plumbing): the gateway's + // `browser-relay-websocket.ts` does not yet forward this header — + // once it does (resolving the actor from the downstream edge token + // at upgrade time), the service-token branch below will start + // picking up the guardianId. Until then, cloud-path registration + // silently no-ops, which is a known limitation tracked for Phase 3. + let guardianId: string | undefined; if (!isHttpAuthDisabled()) { const wsUrl = new URL(req.url); const token = wsUrl.searchParams.get("token"); @@ -645,6 +684,23 @@ export class RuntimeHttpServer { if (!jwtResult.ok) { return httpError("UNAUTHORIZED", "Unauthorized", 401); } + const subResult = parseSub(jwtResult.claims.sub); + if (subResult.ok && subResult.actorPrincipalId) { + // Direct actor principal — this is the loopback / desktop path. + guardianId = subResult.actorPrincipalId; + } else { + // Service-token path (gateway-forwarded). Look for an explicit + // guardian id plumbed by the gateway as a header or query + // param. Header takes precedence because headers are easier + // for the gateway to forward without rewriting the URL. + const headerGuardianId = req.headers.get("x-guardian-id")?.trim() ?? ""; + const queryGuardianId = + wsUrl.searchParams.get("guardianId")?.trim() ?? ""; + const fallbackGuardianId = headerGuardianId || queryGuardianId; + if (fallbackGuardianId) { + guardianId = fallbackGuardianId; + } + } } const connectionId = crypto.randomUUID(); @@ -652,6 +708,7 @@ export class RuntimeHttpServer { data: { wsType: "browser-relay", connectionId, + guardianId, } satisfies BrowserRelayWebSocketData, }); if (!upgraded) { diff --git a/assistant/src/runtime/routes/conversation-routes.ts b/assistant/src/runtime/routes/conversation-routes.ts index 05bfdd4fdd2..86388e761ea 100644 --- a/assistant/src/runtime/routes/conversation-routes.ts +++ b/assistant/src/runtime/routes/conversation-routes.ts @@ -78,6 +78,7 @@ import { silentlyWithLog } from "../../util/silently.js"; import { buildAssistantEvent } from "../assistant-event.js"; import { DAEMON_INTERNAL_ASSISTANT_ID } from "../assistant-scope.js"; import type { AuthContext } from "../auth/types.js"; +import { getChromeExtensionRegistry } from "../chrome-extension-registry.js"; import { bridgeConfirmationRequestToGuardian } from "../confirmation-request-guardian-bridge.js"; import { routeGuardianReply } from "../guardian-reply-router.js"; import { healGuardianBindingDrift } from "../guardian-vellum-migration.js"; @@ -1164,11 +1165,53 @@ export async function handleSendMessage( } else if (!conversation.isProcessing()) { conversation.setHostBashProxy(undefined); } + // For the chrome-extension interface we route host_browser_request / + // host_browser_cancel frames through the in-process ChromeExtensionRegistry + // to the WebSocket opened against /v1/browser-relay by the connected + // extension, instead of the SSE/onEvent hub used by macOS. The registry + // lookup is keyed by the JWT-derived actor principal id, which the + // runtime captured at WebSocket upgrade time. + // + // macOS (and any other interface that supports host_browser in the + // future via the SSE hub) keeps using `onEvent` — see the else branch. + const browserProxySendToClient: (msg: ServerMessage) => void = + sourceInterface === "chrome-extension" + ? (msg) => { + const gid = authContext.actorPrincipalId; + if (!gid) { + // No guardian identity on this turn — nothing to route to. + // The proxy will observe this via its try/catch and surface a + // transport error back to the caller. + throw new Error( + "chrome-extension host_browser send skipped: no guardianId on AuthContext", + ); + } + const ok = getChromeExtensionRegistry().send(gid, msg); + if (!ok) { + throw new Error( + `chrome-extension host_browser send failed: no active connection for guardian ${gid}`, + ); + } + } + : onEvent; + // Stash the registry-routed sender on the conversation so queue-drain + // restores (which run outside of conversation-routes.ts and only have + // access to `sendToClient`) can preserve it when calling + // `restoreBrowserProxyAvailability()`. For non-chrome-extension + // interfaces the override is cleared so the SSE hub sender is used. + if (sourceInterface === "chrome-extension") { + conversation.hostBrowserSenderOverride = browserProxySendToClient; + } else { + conversation.hostBrowserSenderOverride = undefined; + } if (supportsHostProxy(sourceInterface, "host_browser")) { if (!conversation.isProcessing() || !conversation.hostBrowserProxy) { - const browserProxy = new HostBrowserProxy(onEvent, (requestId) => { - pendingInteractions.resolve(requestId); - }); + const browserProxy = new HostBrowserProxy( + browserProxySendToClient, + (requestId) => { + pendingInteractions.resolve(requestId); + }, + ); conversation.setHostBrowserProxy(browserProxy); } } else if (!conversation.isProcessing()) { @@ -1224,6 +1267,12 @@ export async function handleSendMessage( // helper bypasses the `hasNoClient` gate so the single-capability // chrome-extension turn can drive the browser via CDP without leaking // host_bash/host_file tool availability into tool gating. + // + // `restoreBrowserProxyAvailability()` reads `hostBrowserSenderOverride` + // (set above for chrome-extension) and applies the registry-routed + // sender, so the chrome-extension path gets the correct sender here + // — including after queue-drain restores run from conversation-process.ts, + // which only have access to the conversation instance. if (supportsHostProxy(sourceInterface, "host_browser")) { conversation.restoreBrowserProxyAvailability?.(); }