From bd49c2cde7447feef0730a1980b29dd4a0180ba6 Mon Sep 17 00:00:00 2001 From: Vellum Assistant Date: Sun, 3 May 2026 03:16:58 +0000 Subject: [PATCH] refactor(daemon): extract HostProxyBase from HostCuProxy Extract the structurally-shared lifecycle (pending map, timeout, abort SSE, dispose, isAvailable) from HostCuProxy into a new abstract HostProxyBase class. HostCuProxy now extends the base and retains only CU-specific state (step counter, AX-tree diff, loop detector). Part of plan: app-control-skill.md (PR 1 of 16) --- .../src/__tests__/host-proxy-base.test.ts | 338 ++++++++++++++++++ assistant/src/daemon/host-cu-proxy.ts | 191 +++------- assistant/src/daemon/host-proxy-base.ts | 285 +++++++++++++++ 3 files changed, 671 insertions(+), 143 deletions(-) create mode 100644 assistant/src/__tests__/host-proxy-base.test.ts create mode 100644 assistant/src/daemon/host-proxy-base.ts diff --git a/assistant/src/__tests__/host-proxy-base.test.ts b/assistant/src/__tests__/host-proxy-base.test.ts new file mode 100644 index 00000000000..7726ebfb255 --- /dev/null +++ b/assistant/src/__tests__/host-proxy-base.test.ts @@ -0,0 +1,338 @@ +import { afterEach, describe, expect, jest, mock, test } from "bun:test"; + +const sentMessages: unknown[] = []; +const resolvedInteractionIds: string[] = []; +let mockHasClient = false; + +mock.module("../runtime/assistant-event-hub.js", () => ({ + broadcastMessage: (msg: unknown) => sentMessages.push(msg), + assistantEventHub: { + getMostRecentClientByCapability: (cap: string) => + cap === "host_cu" && mockHasClient ? { id: "mock-client" } : null, + }, +})); + +mock.module("../runtime/pending-interactions.js", () => ({ + resolve: (requestId: string) => { + resolvedInteractionIds.push(requestId); + return undefined; + }, + get: () => undefined, + getByKind: () => [], + getByConversation: () => [], + removeByConversation: () => {}, +})); + +const { HostProxyBase, HostProxyRequestError } = + await import("../daemon/host-proxy-base.js"); + +interface TestRequest { + payload: string; +} + +interface TestResultPayload { + result: string; +} + +class TestProxy extends HostProxyBase { + constructor(timeoutMs?: number) { + super({ + capabilityName: "host_cu", + requestEventName: "test_request", + cancelEventName: "test_cancel", + resultPendingKind: "host_cu", + timeoutMs, + disposedMessage: "Test proxy disposed", + }); + } + + // Re-expose the protected `dispatchRequest` so the tests can drive it directly. + send( + toolName: string, + input: TestRequest, + conversationId: string, + signal?: AbortSignal, + extraFields?: Record, + ): Promise { + return this.dispatchRequest( + toolName, + input, + conversationId, + signal, + extraFields, + ); + } +} + +describe("HostProxyBase", () => { + let proxy: TestProxy; + + function setup(timeoutMs?: number) { + sentMessages.length = 0; + resolvedInteractionIds.length = 0; + mockHasClient = false; + proxy = new TestProxy(timeoutMs); + } + + afterEach(() => { + proxy?.dispose(); + }); + + describe("request lifecycle", () => { + test("broadcasts the configured envelope and resolves on resolve()", async () => { + setup(); + + const promise = proxy.send("tool-1", { payload: "hello" }, "conv-1"); + + expect(sentMessages).toHaveLength(1); + const sent = sentMessages[0] as Record; + expect(sent.type).toBe("test_request"); + expect(sent.conversationId).toBe("conv-1"); + expect(sent.toolName).toBe("tool-1"); + expect(sent.input).toEqual({ payload: "hello" }); + expect(typeof sent.requestId).toBe("string"); + + const requestId = sent.requestId as string; + expect(proxy.hasPendingRequest(requestId)).toBe(true); + + proxy.resolve(requestId, { result: "ok" }); + + await expect(promise).resolves.toEqual({ result: "ok" }); + expect(proxy.hasPendingRequest(requestId)).toBe(false); + }); + + test("merges extraFields into the broadcast envelope", async () => { + setup(); + + const promise = proxy.send( + "tool-1", + { payload: "hi" }, + "conv-1", + undefined, + { stepNumber: 7, reasoning: "because" }, + ); + + const sent = sentMessages[0] as Record; + expect(sent.stepNumber).toBe(7); + expect(sent.reasoning).toBe("because"); + expect(sent.input).toEqual({ payload: "hi" }); // input not nested under extras + + // Resolve so afterEach.dispose() doesn't see an orphan pending request. + proxy.resolve(sent.requestId as string, { result: "ok" }); + await promise; + }); + + test("resolve with unknown requestId is silently ignored", () => { + setup(); + // Should not throw + proxy.resolve("unknown-id", { result: "late" }); + }); + }); + + describe("timeout", () => { + test("rejects with HostProxyRequestError(reason='timeout') after timeoutMs", async () => { + setup(); + + jest.useFakeTimers(); + try { + const promise = proxy.send("tool-1", { payload: "x" }, "conv-1"); + const requestId = (sentMessages[0] as Record) + .requestId as string; + expect(proxy.hasPendingRequest(requestId)).toBe(true); + + // Default timeout is 60s. + jest.advanceTimersByTime(61 * 1000); + + await expect(promise).rejects.toBeInstanceOf(HostProxyRequestError); + await expect(promise).rejects.toMatchObject({ reason: "timeout" }); + expect(proxy.hasPendingRequest(requestId)).toBe(false); + expect(resolvedInteractionIds).toContain(requestId); + } finally { + jest.useRealTimers(); + } + }); + + test("respects custom timeoutMs", async () => { + setup(10); + + jest.useFakeTimers(); + try { + const promise = proxy.send("tool-1", { payload: "x" }, "conv-1"); + jest.advanceTimersByTime(11); + + await expect(promise).rejects.toMatchObject({ reason: "timeout" }); + } finally { + jest.useRealTimers(); + } + }); + }); + + describe("abort signal", () => { + test("broadcasts cancel envelope and rejects with reason='aborted'", async () => { + setup(); + + const controller = new AbortController(); + const promise = proxy.send( + "tool-1", + { payload: "x" }, + "conv-1", + controller.signal, + ); + + const requestId = (sentMessages[0] as Record) + .requestId as string; + + controller.abort(); + + await expect(promise).rejects.toMatchObject({ reason: "aborted" }); + + // Second message should be the cancel envelope. + expect(sentMessages).toHaveLength(2); + const cancel = sentMessages[1] as Record; + expect(cancel.type).toBe("test_cancel"); + expect(cancel.requestId).toBe(requestId); + expect(cancel.conversationId).toBe("conv-1"); + + expect(proxy.hasPendingRequest(requestId)).toBe(false); + expect(resolvedInteractionIds).toContain(requestId); + }); + + test("removes abort listener after normal resolve", async () => { + setup(); + + const controller = new AbortController(); + const removeCalls: string[] = []; + const origRemove = controller.signal.removeEventListener.bind( + controller.signal, + ); + (controller.signal as any).removeEventListener = ( + type: string, + ...rest: any[] + ) => { + removeCalls.push(type); + return (origRemove as any)(type, ...rest); + }; + + const promise = proxy.send( + "tool-1", + { payload: "x" }, + "conv-1", + controller.signal, + ); + + const requestId = (sentMessages[0] as Record) + .requestId as string; + proxy.resolve(requestId, { result: "ok" }); + await promise; + + expect(removeCalls).toEqual(["abort"]); + + // Late aborts must be no-ops with no extra envelopes emitted. + controller.abort(); + expect(sentMessages).toHaveLength(1); + }); + }); + + describe("dispose", () => { + test("rejects all pending requests with reason='disposed'", async () => { + setup(); + + const p1 = proxy.send("t1", { payload: "1" }, "conv-1"); + const p2 = proxy.send("t2", { payload: "2" }, "conv-1"); + + // Suppress unhandled rejection noise — we assert below. + p1.catch(() => {}); + p2.catch(() => {}); + + const beforeIds = (sentMessages as Array>).map( + (m) => m.requestId as string, + ); + expect(beforeIds).toHaveLength(2); + + proxy.dispose(); + + await expect(p1).rejects.toBeInstanceOf(HostProxyRequestError); + await expect(p1).rejects.toMatchObject({ + reason: "disposed", + message: "Test proxy disposed", + }); + await expect(p2).rejects.toMatchObject({ reason: "disposed" }); + + // Cancel envelopes broadcast for each pending request. + const cancelMessages = sentMessages + .slice(2) + .filter( + (m) => (m as Record).type === "test_cancel", + ) as Array>; + expect(cancelMessages).toHaveLength(2); + const cancelIds = cancelMessages.map((m) => m.requestId as string); + expect(cancelIds).toEqual(expect.arrayContaining(beforeIds)); + + // pendingInteractions notified for each pending request. + for (const id of beforeIds) { + expect(resolvedInteractionIds).toContain(id); + } + }); + + test("clears all timers on dispose", async () => { + setup(); + + jest.useFakeTimers(); + try { + const p = proxy.send("t1", { payload: "1" }, "conv-1"); + p.catch(() => {}); + + proxy.dispose(); + + // Advance well past the default timeout — no extra rejection or log + // should fire because the timer was cleared. + jest.advanceTimersByTime(120 * 1000); + + await expect(p).rejects.toMatchObject({ reason: "disposed" }); + } finally { + jest.useRealTimers(); + } + }); + }); + + describe("isAvailable", () => { + test("returns false when no client with the configured capability is connected", () => { + setup(); + mockHasClient = false; + expect(proxy.isAvailable()).toBe(false); + }); + + test("returns true when a client with the configured capability is connected", () => { + setup(); + mockHasClient = true; + expect(proxy.isAvailable()).toBe(true); + }); + }); + + describe("cancel", () => { + test("broadcasts cancel envelope and rejects the pending request", async () => { + setup(); + + const promise = proxy.send("t1", { payload: "1" }, "conv-1"); + promise.catch(() => {}); + const requestId = (sentMessages[0] as Record) + .requestId as string; + + proxy.cancel(requestId, "user-canceled"); + + await expect(promise).rejects.toMatchObject({ reason: "aborted" }); + expect(proxy.hasPendingRequest(requestId)).toBe(false); + + const cancel = sentMessages[1] as Record; + expect(cancel.type).toBe("test_cancel"); + expect(cancel.requestId).toBe(requestId); + expect(resolvedInteractionIds).toContain(requestId); + }); + + test("is a no-op for unknown request id", () => { + setup(); + proxy.cancel("nope", "any"); + expect(sentMessages).toHaveLength(0); + }); + }); +}); diff --git a/assistant/src/daemon/host-cu-proxy.ts b/assistant/src/daemon/host-cu-proxy.ts index 94f27405d05..ab94231da22 100644 --- a/assistant/src/daemon/host-cu-proxy.ts +++ b/assistant/src/daemon/host-cu-proxy.ts @@ -9,21 +9,18 @@ * Unlike HostBashProxy/HostFileProxy/HostTransferProxy, this is NOT a * singleton — each conversation gets its own instance because CU state * (step count, AX tree history, loop detection) is per-conversation. + * + * Lifecycle (pending map, timeout, abort SSE, dispose, isAvailable) lives + * in {@link HostProxyBase}; this class layers CU-specific state and the + * observation → ToolExecutionResult translation on top. */ -import { v4 as uuid } from "uuid"; - import { escapeAxTreeContent } from "../agent/loop.js"; import { loadConfig } from "../config/loader.js"; import type { ContentBlock } from "../providers/types.js"; -import { - assistantEventHub, - broadcastMessage, -} from "../runtime/assistant-event-hub.js"; -import * as pendingInteractions from "../runtime/pending-interactions.js"; import type { ToolExecutionResult } from "../tools/types.js"; -import { AssistantError, ErrorCode } from "../util/errors.js"; import { getLogger } from "../util/logger.js"; +import { HostProxyBase, HostProxyRequestError } from "./host-proxy-base.js"; const log = getLogger("host-cu-proxy"); @@ -31,7 +28,7 @@ const log = getLogger("host-cu-proxy"); // Constants // --------------------------------------------------------------------------- -const REQUEST_TIMEOUT_SEC = 60; +const REQUEST_TIMEOUT_MS = 60 * 1000; const MAX_HISTORY_ENTRIES = 10; const LOOP_DETECTION_WINDOW = 3; const CONSECUTIVE_UNCHANGED_WARNING_THRESHOLD = 2; @@ -61,22 +58,14 @@ export interface ActionRecord { reasoning?: string; } -interface PendingRequest { - resolve: (result: ToolExecutionResult) => void; - reject: (err: Error) => void; - timer: ReturnType; - conversationId: string; - /** Detach the abort listener from the caller's signal. No-op when no signal was passed. */ - detachAbort: () => void; -} - // --------------------------------------------------------------------------- // HostCuProxy // --------------------------------------------------------------------------- -export class HostCuProxy { - private pending = new Map(); - +export class HostCuProxy extends HostProxyBase< + Record, + CuObservationResult +> { // CU state tracking (per-conversation) private _stepCount = 0; private _maxSteps: number; @@ -85,6 +74,14 @@ export class HostCuProxy { private _actionHistory: ActionRecord[] = []; constructor(maxSteps = loadConfig().maxStepsPerSession) { + super({ + capabilityName: "host_cu", + requestEventName: "host_cu_request", + cancelEventName: "host_cu_cancel", + resultPendingKind: "host_cu", + timeoutMs: REQUEST_TIMEOUT_MS, + disposedMessage: "Host CU proxy disposed", + }); this._maxSteps = maxSteps; } @@ -113,20 +110,7 @@ export class HostCuProxy { } // --------------------------------------------------------------------------- - // Availability - // --------------------------------------------------------------------------- - - /** - * Whether a client with `host_cu` capability is connected. - */ - isAvailable(): boolean { - return ( - assistantEventHub.getMostRecentClientByCapability("host_cu") != null - ); - } - - // --------------------------------------------------------------------------- - // Request / resolve lifecycle + // Request lifecycle (CU-specific overlay on top of HostProxyBase.dispatchRequest) // --------------------------------------------------------------------------- request( @@ -152,90 +136,36 @@ export class HostCuProxy { }); } - const requestId = uuid(); - - return new Promise((resolve, reject) => { - let detachAbort: () => void = () => {}; - - const timer = setTimeout(() => { - this.pending.delete(requestId); - detachAbort(); - pendingInteractions.resolve(requestId); - log.warn({ requestId, toolName }, "Host CU proxy request timed out"); - resolve({ - content: "Host CU proxy timed out waiting for client response", - isError: true, - }); - }, REQUEST_TIMEOUT_SEC * 1000); - - if (signal) { - const onAbort = () => { - if (this.pending.has(requestId)) { - clearTimeout(timer); - this.pending.delete(requestId); - detachAbort(); - pendingInteractions.resolve(requestId); - try { - broadcastMessage({ - type: "host_cu_cancel", - requestId, - conversationId, - }); - } catch { - // Best-effort cancel notification — connection may already be closed. - } - resolve({ content: "Aborted", isError: true }); + return this.dispatchRequest(toolName, input, conversationId, signal, { + stepNumber, + reasoning, + }) + .then((observation) => { + // Capture pre-update state so formatObservation sees the correct + // previous AX tree. + const prevAXTree = this._previousAXTree; + this.updateStateFromObservation(observation); + return this.formatObservation(observation, prevAXTree); + }) + .catch((err: unknown) => { + if (err instanceof HostProxyRequestError) { + if (err.reason === "timeout") { + log.warn({ toolName }, "Host CU proxy request timed out"); + return { + content: "Host CU proxy timed out waiting for client response", + isError: true, + } satisfies ToolExecutionResult; } - }; - signal.addEventListener("abort", onAbort, { once: true }); - detachAbort = () => signal.removeEventListener("abort", onAbort); - } - - this.pending.set(requestId, { resolve, reject, timer, conversationId, detachAbort }); - - try { - broadcastMessage({ - type: "host_cu_request", - requestId, - conversationId, - toolName, - input, - stepNumber, - reasoning, - }); - } catch (err) { - clearTimeout(timer); - this.pending.delete(requestId); - detachAbort(); - pendingInteractions.resolve(requestId); - log.warn({ requestId, toolName, err }, "Host CU proxy send failed"); - reject(err instanceof Error ? err : new Error(String(err))); - } - }); - } - - resolve(requestId: string, observation: CuObservationResult): void { - const entry = this.pending.get(requestId); - if (!entry) { - log.warn({ requestId }, "No pending host CU request for response"); - return; - } - clearTimeout(entry.timer); - entry.detachAbort(); - this.pending.delete(requestId); - - // Capture pre-update state so formatObservation sees the correct previous AX tree - const prevAXTree = this._previousAXTree; - - // Update CU state from observation - this.updateStateFromObservation(observation); - - const result = this.formatObservation(observation, prevAXTree); - entry.resolve(result); - } - - hasPendingRequest(requestId: string): boolean { - return this.pending.has(requestId); + if (err.reason === "aborted") { + return { + content: "Aborted", + isError: true, + } satisfies ToolExecutionResult; + } + } + // Disposed or other unexpected errors propagate to the caller. + throw err; + }); } // --------------------------------------------------------------------------- @@ -396,31 +326,6 @@ export class HostCuProxy { }; } - // --------------------------------------------------------------------------- - // Dispose - // --------------------------------------------------------------------------- - - dispose(): void { - for (const [requestId, entry] of this.pending) { - clearTimeout(entry.timer); - entry.detachAbort(); - pendingInteractions.resolve(requestId); - try { - broadcastMessage({ - type: "host_cu_cancel", - requestId, - conversationId: entry.conversationId, - }); - } catch { - // Best-effort cancel notification — connection may already be closed. - } - entry.reject( - new AssistantError("Host CU proxy disposed", ErrorCode.INTERNAL_ERROR), - ); - } - this.pending.clear(); - } - // --------------------------------------------------------------------------- // Private helpers // --------------------------------------------------------------------------- diff --git a/assistant/src/daemon/host-proxy-base.ts b/assistant/src/daemon/host-proxy-base.ts new file mode 100644 index 00000000000..a92ebc721fd --- /dev/null +++ b/assistant/src/daemon/host-proxy-base.ts @@ -0,0 +1,285 @@ +/** + * Shared lifecycle base for host-proxy classes (HostBashProxy, HostCuProxy, + * HostFileProxy, HostTransferProxy, HostBrowserProxy, ...). + * + * Each host proxy: + * - dispatches a request to the desktop client via the assistant event hub, + * - tracks the request in a pending map keyed by `requestId`, + * - times the request out after a configurable interval, + * - cancels the request when the caller's `AbortSignal` fires, + * - rejects all pending requests on `dispose()`, + * - exposes `isAvailable()` based on the connected client's capabilities. + * + * Subclasses keep proxy-specific concerns (envelope shape, observation + * formatting, per-proxy state like CU's step counter) out of the base. + */ +import { v4 as uuid } from "uuid"; + +import type { HostProxyCapability } from "../channels/types.js"; +import { + assistantEventHub, + broadcastMessage, +} from "../runtime/assistant-event-hub.js"; +import * as pendingInteractions from "../runtime/pending-interactions.js"; +import { AssistantError, ErrorCode } from "../util/errors.js"; +import { getLogger } from "../util/logger.js"; +import type { ServerMessage } from "./message-protocol.js"; + +const log = getLogger("host-proxy-base"); + +/** + * `broadcastMessage` is statically typed against the discriminated + * `ServerMessage` union. The base class assembles envelopes from + * constructor-supplied event names and untyped extra fields, so static + * narrowing is impossible — subclasses are responsible for passing event + * names that match a real `ServerMessage` variant. + */ +function broadcastDynamic(envelope: Record): void { + broadcastMessage(envelope as unknown as ServerMessage); +} + +const DEFAULT_TIMEOUT_MS = 60_000; + +/** Reason a pending request was rejected by the base. */ +export type HostProxyRejectionReason = "timeout" | "aborted" | "disposed"; + +/** + * Error thrown by the base when a pending request is rejected via the + * lifecycle paths (timeout, abort, dispose). Subclasses inspect `reason` + * to map back to their proxy-specific error / observation shape. + */ +export class HostProxyRequestError extends AssistantError { + constructor( + message: string, + public readonly reason: HostProxyRejectionReason, + ) { + super(message, ErrorCode.INTERNAL_ERROR); + this.name = "HostProxyRequestError"; + } +} + +interface PendingEntry { + resolve: (payload: TResultPayload) => void; + reject: (err: Error) => void; + timer: ReturnType; + conversationId: string; + /** Detach the abort listener from the caller's signal. No-op when no signal was passed. */ + detachAbort: () => void; +} + +export interface HostProxyBaseOptions { + /** Capability advertised by clients that can service this proxy. */ + capabilityName: HostProxyCapability; + /** Outbound message `type` for new requests (e.g. `"host_cu_request"`). */ + requestEventName: string; + /** Outbound message `type` for cancellation (e.g. `"host_cu_cancel"`). */ + cancelEventName: string; + /** Tag used to identify this proxy's requests in `pendingInteractions`. */ + resultPendingKind: string; + /** Per-request timeout. Defaults to 60s. */ + timeoutMs?: number; + /** Customizable disposed-rejection message (used in test assertions). */ + disposedMessage?: string; +} + +export abstract class HostProxyBase { + protected pending = new Map>(); + + protected readonly capabilityName: HostProxyCapability; + protected readonly requestEventName: string; + protected readonly cancelEventName: string; + protected readonly resultPendingKind: string; + protected readonly timeoutMs: number; + protected readonly disposedMessage: string; + + constructor(opts: HostProxyBaseOptions) { + this.capabilityName = opts.capabilityName; + this.requestEventName = opts.requestEventName; + this.cancelEventName = opts.cancelEventName; + this.resultPendingKind = opts.resultPendingKind; + this.timeoutMs = opts.timeoutMs ?? DEFAULT_TIMEOUT_MS; + this.disposedMessage = opts.disposedMessage ?? "Host proxy disposed"; + } + + /** + * Whether a client advertising the configured capability is connected. + */ + isAvailable(): boolean { + return ( + assistantEventHub.getMostRecentClientByCapability(this.capabilityName) != + null + ); + } + + /** + * Dispatch a request envelope to the connected client and return a + * promise that resolves when the client responds (via `resolve()`), + * rejects on timeout/abort/dispose, or rejects synchronously if the + * broadcast itself fails. + * + * `extraFields` is shallow-merged into the broadcast envelope so + * subclasses can include proxy-specific top-level fields (e.g. CU's + * `stepNumber` / `reasoning`) without nesting them inside `input`. + * + * Named `dispatchRequest` rather than `request` so subclasses are free to + * expose their own public `request(...)` with a proxy-specific signature + * (e.g. CU passes `stepNumber` and `reasoning` to its callers). + */ + protected dispatchRequest( + toolName: string, + input: TRequest, + conversationId: string, + signal?: AbortSignal, + extraFields?: Record, + ): Promise { + const requestId = uuid(); + + return new Promise((resolve, reject) => { + // Declared up-front so onAbort can close over a stable reference once + // it's wired below. + let detachAbort: () => void = () => {}; + + const timer = setTimeout(() => { + this.pending.delete(requestId); + detachAbort(); + pendingInteractions.resolve(requestId); + log.warn( + { requestId, toolName, kind: this.resultPendingKind }, + "Host proxy request timed out", + ); + reject(new HostProxyRequestError("timeout", "timeout")); + }, this.timeoutMs); + + if (signal) { + const onAbort = () => { + if (this.pending.has(requestId)) { + clearTimeout(timer); + this.pending.delete(requestId); + detachAbort(); + pendingInteractions.resolve(requestId); + try { + broadcastDynamic({ + type: this.cancelEventName, + requestId, + conversationId, + }); + } catch { + // Best-effort cancel notification — connection may already be closed. + } + reject(new HostProxyRequestError("aborted", "aborted")); + } + }; + signal.addEventListener("abort", onAbort, { once: true }); + detachAbort = () => signal.removeEventListener("abort", onAbort); + } + + this.pending.set(requestId, { + resolve, + reject, + timer, + conversationId, + detachAbort, + }); + + try { + broadcastDynamic({ + type: this.requestEventName, + requestId, + conversationId, + toolName, + input, + ...(extraFields ?? {}), + }); + } catch (err) { + clearTimeout(timer); + this.pending.delete(requestId); + detachAbort(); + pendingInteractions.resolve(requestId); + log.warn( + { requestId, toolName, kind: this.resultPendingKind, err }, + "Host proxy send failed", + ); + reject(err instanceof Error ? err : new Error(String(err))); + } + }); + } + + /** + * Resolve a pending request with the client-provided payload. No-op when + * no entry is registered for `requestId` (late responses after timeout + * or abort fall through to here). + */ + resolve(requestId: string, payload: TResultPayload): void { + const entry = this.pending.get(requestId); + if (!entry) { + log.warn( + { requestId, kind: this.resultPendingKind }, + "No pending host proxy request for response", + ); + return; + } + clearTimeout(entry.timer); + entry.detachAbort(); + this.pending.delete(requestId); + entry.resolve(payload); + } + + /** + * Whether `requestId` is still registered as pending. Useful to subclasses + * that need to reason about the lifecycle in tests. + */ + hasPendingRequest(requestId: string): boolean { + return this.pending.has(requestId); + } + + /** + * Cancel a pending request out-of-band (e.g. when the conversation is + * being torn down). Broadcasts a cancel envelope and rejects with + * `"aborted"`. No-op when no entry is registered. + */ + cancel(requestId: string, reason: string): void { + const entry = this.pending.get(requestId); + if (!entry) return; + clearTimeout(entry.timer); + entry.detachAbort(); + this.pending.delete(requestId); + pendingInteractions.resolve(requestId); + log.info( + { requestId, reason, kind: this.resultPendingKind }, + "Host proxy request canceled", + ); + try { + broadcastDynamic({ + type: this.cancelEventName, + requestId, + conversationId: entry.conversationId, + }); + } catch { + // Best-effort cancel notification — connection may already be closed. + } + entry.reject(new HostProxyRequestError("aborted", "aborted")); + } + + /** + * Reject every pending request and clear the map. Called during graceful + * shutdown or proxy teardown. + */ + dispose(): void { + for (const [requestId, entry] of this.pending) { + clearTimeout(entry.timer); + entry.detachAbort(); + pendingInteractions.resolve(requestId); + try { + broadcastDynamic({ + type: this.cancelEventName, + requestId, + conversationId: entry.conversationId, + }); + } catch { + // Best-effort cancel notification — connection may already be closed. + } + entry.reject(new HostProxyRequestError(this.disposedMessage, "disposed")); + } + this.pending.clear(); + } +}