diff --git a/assistant/src/__tests__/db-proxy-transaction.test.ts b/assistant/src/__tests__/db-proxy-transaction.test.ts index 7ced6d2fc9d..6accb5992d8 100644 --- a/assistant/src/__tests__/db-proxy-transaction.test.ts +++ b/assistant/src/__tests__/db-proxy-transaction.test.ts @@ -13,6 +13,7 @@ import { beforeEach, describe, expect, test } from "bun:test"; import { handleDbProxyTransaction } from "../ipc/routes/db-proxy-transaction.js"; import { getSqlite } from "../memory/db-connection.js"; import { initializeDb } from "../memory/db-init.js"; +import { RouteError } from "../runtime/routes/errors.js"; initializeDb(); @@ -65,7 +66,8 @@ describe("db_proxy_transaction", () => { .prepare("INSERT INTO proxy_tx_test (id, label) VALUES (?, ?)") .run(2, "preexisting"); - expect(() => + let caught: unknown; + try { handleDbProxyTransaction({ steps: [ { @@ -78,8 +80,23 @@ describe("db_proxy_transaction", () => { bind: [2, "beta"], }, ], - }), - ).toThrow(); + }); + } catch (err) { + caught = err; + } + + // The thrown error must be a RouteError carrying the underlying SQL + // message — without the wrapping, the IPC envelope would lose the + // statusCode and the gateway-side strict caller would misclassify + // this as a transport failure ("assistant may not be ready"). + expect(caught).toBeInstanceOf(RouteError); + if (caught instanceof RouteError) { + expect(caught.code).toBe("DB_PROXY_TRANSACTION_FAILED"); + expect(caught.statusCode).toBe(500); + // The original SQL constraint message must survive the wrap so + // operators can debug from the gateway logs. + expect(caught.message).toMatch(/UNIQUE|PRIMARY KEY|constraint/i); + } // The first step's insert must NOT have committed. expect(rowCount()).toBe(1); @@ -156,10 +173,19 @@ describe("db_proxy_transaction", () => { expect(updated.count).toBe(1); }); - test("rejects empty step list", () => { - expect(() => handleDbProxyTransaction({ steps: [] })).toThrow( - /at least one step/, - ); + test("rejects empty step list with a 400 RouteError", () => { + let caught: unknown; + try { + handleDbProxyTransaction({ steps: [] }); + } catch (err) { + caught = err; + } + expect(caught).toBeInstanceOf(RouteError); + if (caught instanceof RouteError) { + expect(caught.statusCode).toBe(400); + expect(caught.code).toBe("INVALID_PARAMS"); + expect(caught.message).toMatch(/at least one step/); + } }); test("returns lastInsertRowid for inserts", () => { diff --git a/assistant/src/ipc/routes/db-proxy-transaction.ts b/assistant/src/ipc/routes/db-proxy-transaction.ts index 2b7ace2fd6f..d32b4f31409 100644 --- a/assistant/src/ipc/routes/db-proxy-transaction.ts +++ b/assistant/src/ipc/routes/db-proxy-transaction.ts @@ -27,6 +27,7 @@ */ import { getSqlite } from "../../memory/db-connection.js"; +import { RouteError } from "../../runtime/routes/errors.js"; import { getLogger } from "../../util/logger.js"; const log = getLogger("db-proxy-transaction"); @@ -71,7 +72,11 @@ export function handleDbProxyTransaction( const db = getSqlite(); if (!Array.isArray(params.steps) || params.steps.length === 0) { - throw new Error("db_proxy_transaction requires at least one step"); + throw new RouteError( + "db_proxy_transaction requires at least one step", + "INVALID_PARAMS", + 400, + ); } // Sentinel used to abort the transaction without leaking through as a generic @@ -128,7 +133,14 @@ export function handleDbProxyTransaction( requiredChanges: err.requiredChanges, }; } - throw err; + // Wrap raw SQL/runtime errors in RouteError so the IPC envelope carries + // a statusCode + errorCode. Without this, the gateway-side strict caller + // sees a structureless `msg.error` and misclassifies it as a transport + // failure ("assistant may not be ready"), masking the real SQL error + // and breaking debuggability + retry decisions. + const message = err instanceof Error ? err.message : String(err); + log.warn({ err }, "db-proxy-transaction execution failed"); + throw new RouteError(message, "DB_PROXY_TRANSACTION_FAILED", 500); } log.debug( diff --git a/gateway/src/__tests__/contact-prompt-submit.test.ts b/gateway/src/__tests__/contact-prompt-submit.test.ts index ec4ea668116..65199ce05fd 100644 --- a/gateway/src/__tests__/contact-prompt-submit.test.ts +++ b/gateway/src/__tests__/contact-prompt-submit.test.ts @@ -30,13 +30,13 @@ initSigningKey(Buffer.from("test-signing-key-at-least-32-bytes-long-xx")); let testAssistantDb: Database | null = null; mock.module("../db/assistant-db-proxy.js", () => ({ - // eslint-disable-next-line @typescript-eslint/no-explicit-any + async assistantDbQuery(sql: string, bind?: any[]) { if (!testAssistantDb) throw new Error("test assistant DB not initialized"); const stmt = testAssistantDb.prepare(sql); return bind ? stmt.all(...bind) : stmt.all(); }, - // eslint-disable-next-line @typescript-eslint/no-explicit-any + async assistantDbRun(sql: string, bind?: any[]) { if (!testAssistantDb) throw new Error("test assistant DB not initialized"); const stmt = testAssistantDb.prepare(sql); @@ -187,7 +187,7 @@ describe("handleContactPromptSubmit", () => { // IPC should have been called with the guardian contactId. expect(ipcMock).toHaveBeenCalledTimes(1); - // eslint-disable-next-line @typescript-eslint/no-explicit-any + const ipcCall = (ipcMock.mock.calls as any[][])[0][1] as { body: Record }; expect(ipcCall.body.contactId).toBe("guardian-1"); }); @@ -258,7 +258,7 @@ describe("handleContactPromptSubmit", () => { // IPC should have been called with an error so the CLI doesn't hang. expect(ipcMock).toHaveBeenCalledTimes(1); - // eslint-disable-next-line @typescript-eslint/no-explicit-any + const ipcCall = (ipcMock.mock.calls as any[][])[0][1] as { body: Record }; expect(typeof ipcCall.body.error).toBe("string"); }); @@ -315,7 +315,7 @@ describe("handleContactPromptSubmit", () => { expect(contacts).toHaveLength(1); expect(contacts[0].id).toBe("contact-1"); - // eslint-disable-next-line @typescript-eslint/no-explicit-any + const ipcCall = (ipcMock.mock.calls as any[][])[0][1] as { body: Record }; expect(ipcCall.body.contactId).toBe("contact-1"); }); diff --git a/gateway/src/db/assistant-db-proxy.ts b/gateway/src/db/assistant-db-proxy.ts index 8f5aa331214..b80c030a20a 100644 --- a/gateway/src/db/assistant-db-proxy.ts +++ b/gateway/src/db/assistant-db-proxy.ts @@ -13,7 +13,10 @@ * gateway's own database. */ -import { ipcCallAssistant } from "../ipc/assistant-client.js"; +import { + IpcHandlerError, + ipcCallAssistant, +} from "../ipc/assistant-client.js"; type SqliteValue = string | number | null | Uint8Array; @@ -28,11 +31,11 @@ async function dbProxy( mode: "query" | "run" | "exec", bind?: SqliteValue[], ): Promise { - const result = await ipcCallAssistant("db_proxy", { sql, mode, bind }); - if (result === undefined) { - throw new Error("db_proxy IPC call failed — assistant may not be ready"); - } - return result as DbProxyResult; + return (await ipcCallAssistant("db_proxy", { + sql, + mode, + bind, + })) as DbProxyResult; } /** @@ -107,17 +110,28 @@ export type AssistantDbTransactionResult = * Use this when several writes must succeed or fail as a unit (e.g. invite * redemption: contact-channel upsert + invite-use record). * + * Error handling: + * - `requireChanges` violations return `{ ok: false, reason: "require_changes_failed", ... }`. + * - Handler-level failures (SQL constraint errors, malformed params) throw + * `IpcHandlerError` so the underlying SQL message is preserved. + * - Transport failures (socket missing, daemon unreachable, timeout) throw + * `IpcTransportError`. Use this to distinguish retryable vs. + * non-retryable failures. + * * Read-modify-write across steps is not supported. Use SQL-level conditions * (WHERE clauses, ON CONFLICT) plus `requireChanges` for stale-write detection. */ export async function assistantDbTransaction( steps: AssistantDbTransactionStep[], ): Promise { - const result = await ipcCallAssistant("db_proxy_transaction", { steps }); - if (result === undefined) { - throw new Error( - "db_proxy_transaction IPC call failed — assistant may not be ready", - ); - } - return result as AssistantDbTransactionResult; + return (await ipcCallAssistant("db_proxy_transaction", { + steps, + })) as AssistantDbTransactionResult; } + +/** + * Re-export so callers in this module's domain (gateway DB write helpers) + * can identify SQL/handler failures from the assistant DB proxy without + * importing from the IPC client directly. + */ +export { IpcHandlerError }; diff --git a/gateway/src/db/schema.ts b/gateway/src/db/schema.ts index 0c4a2f2cdab..380ce80e4d1 100644 --- a/gateway/src/db/schema.ts +++ b/gateway/src/db/schema.ts @@ -121,6 +121,35 @@ export const contactChannels = sqliteTable( ], ); +export const ingressInvites = sqliteTable( + "ingress_invites", + { + id: text("id").primaryKey(), + sourceChannel: text("source_channel").notNull(), + inviteCodeHash: text("invite_code_hash").notNull(), + note: text("note"), + maxUses: integer("max_uses").notNull().default(1), + useCount: integer("use_count").notNull().default(0), + expiresAt: integer("expires_at").notNull(), + status: text("status").notNull().default("active"), + redeemedByExternalUserId: text("redeemed_by_external_user_id"), + redeemedByExternalChatId: text("redeemed_by_external_chat_id"), + redeemedAt: integer("redeemed_at"), + contactId: text("contact_id") + .notNull() + .references(() => contacts.id, { onDelete: "cascade" }), + createdAt: integer("created_at").notNull(), + updatedAt: integer("updated_at").notNull(), + }, + (table) => [ + index("idx_ingress_invites_code_lookup").on( + table.inviteCodeHash, + table.sourceChannel, + ), + index("idx_ingress_invites_contact").on(table.contactId), + ], +); + // --------------------------------------------------------------------------- // Auto-approve thresholds // --------------------------------------------------------------------------- diff --git a/gateway/src/handlers/handle-inbound.ts b/gateway/src/handlers/handle-inbound.ts index fc76563f020..4a85fff1336 100644 --- a/gateway/src/handlers/handle-inbound.ts +++ b/gateway/src/handlers/handle-inbound.ts @@ -173,12 +173,9 @@ export async function handleInbound( // ── Contact channel interaction tracking (dual-write) ── // Reads from the assistant DB (source of truth during migration), - // writes to both assistant DB and gateway DB. Uses ipcCallAssistant - // directly (resolves to undefined on failure, never throws) so - // socket errors cannot leak as unhandled rejections in tests. + // writes to both assistant DB and gateway DB. Fire-and-forget so + // IPC failures here cannot leak as unhandled rejections. if (!response.denied) { - // Fire-and-forget: detach from current async context so pending - // IPC socket operations cannot leak into test runners. void touchContactChannelStats(event, response.duplicate).catch( () => {}, ); @@ -211,8 +208,8 @@ interface DbProxyResult { * Look up the contact channel in the assistant DB and dual-write * interaction stats to both the assistant and gateway databases. * - * Uses ipcCallAssistant directly (resolves to undefined on failure) - * so socket errors cannot surface as unhandled rejections. + * Caller wraps in `.catch(() => {})` so IPC failures cannot surface as + * unhandled rejections. */ async function touchContactChannelStats( event: GatewayInboundEvent, @@ -234,17 +231,17 @@ async function touchContactChannelStats( sql: "SELECT id FROM contact_channels WHERE type = ? AND external_user_id = ? LIMIT 1", mode: "query", bind: [event.sourceChannel, canonicalActorId], - })) as DbProxyResult | undefined; + })) as DbProxyResult; - if (!result?.rows?.length) { + if (!result.rows?.length) { result = (await ipcCallAssistant("db_proxy", { sql: "SELECT id FROM contact_channels WHERE type = ? AND external_chat_id = ? LIMIT 1", mode: "query", bind: [event.sourceChannel, event.message.conversationExternalId], - })) as DbProxyResult | undefined; + })) as DbProxyResult; } - if (!result?.rows?.length) return; + if (!result.rows?.length) return; const channelId = result.rows[0].id as string; const now = Date.now(); diff --git a/gateway/src/http/routes/contact-prompt.ts b/gateway/src/http/routes/contact-prompt.ts index 9d3f0288854..89ff5f1ae98 100644 --- a/gateway/src/http/routes/contact-prompt.ts +++ b/gateway/src/http/routes/contact-prompt.ts @@ -168,9 +168,10 @@ export async function handleContactPromptSubmit(req: Request): Promise { channelType, address: normalizedAddress, contactId, existingContactId: existingChannel[0].contactId }, "contact-prompt-submit: channel already assigned to another contact", ); - await ipcCallAssistant("resolve_contact_prompt", { - body: { requestId, error: "Channel already assigned to another contact" }, - }); + await notifyDaemonResolveError( + requestId, + "Channel already assigned to another contact", + ); return Response.json( { accepted: false, error: "Channel already assigned to another contact" }, { status: 409 }, @@ -223,9 +224,10 @@ export async function handleContactPromptSubmit(req: Request): Promise } // Notify daemon of failure so the CLI doesn't hang. - await ipcCallAssistant("resolve_contact_prompt", { - body: { requestId, error: "Failed to create contact channel" }, - }); + await notifyDaemonResolveError( + requestId, + "Failed to create contact channel", + ); return Response.json( { accepted: false, error: "Failed to create contact channel" }, { status: 500 }, @@ -239,22 +241,49 @@ export async function handleContactPromptSubmit(req: Request): Promise } } catch (err) { log.error({ err, requestId }, "contact-prompt-submit: DB error"); - await ipcCallAssistant("resolve_contact_prompt", { - body: { requestId, error: "Database error" }, - }); + await notifyDaemonResolveError(requestId, "Database error"); return Response.json({ accepted: false, error: "Database error" }, { status: 500 }); } // Notify daemon to unblock the waiting contacts/prompt IPC call. - const ipcResult = await ipcCallAssistant("resolve_contact_prompt", { - body: { requestId, contactId, channelId, channelType, address: normalizedAddress }, - }); - if (!ipcResult || (ipcResult as { resolved?: boolean }).resolved === false) { + try { + const ipcResult = await ipcCallAssistant("resolve_contact_prompt", { + body: { requestId, contactId, channelId, channelType, address: normalizedAddress }, + }); + if ((ipcResult as { resolved?: boolean }).resolved === false) { + log.warn( + { requestId, contactId }, + "contact-prompt-submit: resolve_contact_prompt IPC did not find a pending prompt — CLI may time out", + ); + } + } catch (err) { log.warn( - { requestId, contactId }, - "contact-prompt-submit: resolve_contact_prompt IPC did not find a pending prompt — CLI may time out", + { err, requestId, contactId }, + "contact-prompt-submit: resolve_contact_prompt IPC failed — CLI may time out", ); } return Response.json({ accepted: true }); } + +/** + * Best-effort notification to the daemon that a pending contact prompt has + * resolved with an error. Failures here must not block the HTTP response — + * the caller has already decided the request failed; we just want to wake + * the CLI up. + */ +async function notifyDaemonResolveError( + requestId: string, + error: string, +): Promise { + try { + await ipcCallAssistant("resolve_contact_prompt", { + body: { requestId, error }, + }); + } catch (err) { + log.warn( + { err, requestId }, + "contact-prompt-submit: resolve_contact_prompt error notification failed", + ); + } +} diff --git a/gateway/src/http/routes/contacts-control-plane-proxy.ts b/gateway/src/http/routes/contacts-control-plane-proxy.ts index f9141401cdd..3f6b9bdec67 100644 --- a/gateway/src/http/routes/contacts-control-plane-proxy.ts +++ b/gateway/src/http/routes/contacts-control-plane-proxy.ts @@ -103,7 +103,7 @@ export function createContactsControlPlaneProxyHandler(config: GatewayConfig) { getGatewayDb().delete(contacts).where(eq(contacts.id, contactId)).run(); void ipcCallAssistant("emit_event", { body: { kind: "contacts_changed" }, - } as unknown as Record); + } as unknown as Record).catch(() => {}); log.info({ contactId }, "delete_contact: deleted"); return new Response(null, { status: 204 }); }, diff --git a/gateway/src/http/routes/ipc-runtime-proxy.test.ts b/gateway/src/http/routes/ipc-runtime-proxy.test.ts index 8fcf6417162..d358876cd73 100644 --- a/gateway/src/http/routes/ipc-runtime-proxy.test.ts +++ b/gateway/src/http/routes/ipc-runtime-proxy.test.ts @@ -31,39 +31,38 @@ class MockIpcTransportError extends Error { } } -// ipcCallAssistant — used by refreshRouteSchema to prime the cache -const ipcCallAssistantMock = mock(() => - Promise.resolve([ - { operationId: "health", endpoint: "health", method: "GET" }, - { operationId: "acp_steer", endpoint: "acp/:id/steer", method: "POST" }, - { - operationId: "acp_list_sessions", - endpoint: "acp/sessions", - method: "GET", - }, - { - operationId: "apps_dist_file", - endpoint: "apps/:appId/dist/:filename", - method: "GET", - }, - // Policy-enforced routes (policies resolved gateway-side) - { operationId: "settings_get", endpoint: "settings", method: "GET" }, - { operationId: "calls_start", endpoint: "calls/start", method: "POST" }, - ]), -); +// Single mock for `ipcCallAssistant` — used by both `refreshRouteSchema` +// (to prime the cache) and `tryIpcProxy` (per-request IPC calls). +const ROUTE_SCHEMA = [ + { operationId: "health", endpoint: "health", method: "GET" }, + { operationId: "acp_steer", endpoint: "acp/:id/steer", method: "POST" }, + { + operationId: "acp_list_sessions", + endpoint: "acp/sessions", + method: "GET", + }, + { + operationId: "apps_dist_file", + endpoint: "apps/:appId/dist/:filename", + method: "GET", + }, + // Policy-enforced routes (policies resolved gateway-side) + { operationId: "settings_get", endpoint: "settings", method: "GET" }, + { operationId: "calls_start", endpoint: "calls/start", method: "POST" }, +]; + +const defaultIpcImpl = ( + method: string, + _params?: Record, +): Promise => { + if (method === "get_route_schema") return Promise.resolve(ROUTE_SCHEMA); + return Promise.resolve({ ok: true }); +}; -// ipcCallAssistantStrict — used by tryIpcProxy for actual IPC calls -const ipcCallAssistantStrictMock = mock( - (_method: string, _params?: Record) => - Promise.resolve({ ok: true }), -); +const ipcCallAssistantMock = mock(defaultIpcImpl); -// Single mock.module for assistant-client — must include ALL exports -// that any transitive import needs (route-schema-cache uses ipcCallAssistant, -// ipc-runtime-proxy uses ipcCallAssistantStrict + error classes). mock.module("../../ipc/assistant-client.js", () => ({ ipcCallAssistant: ipcCallAssistantMock, - ipcCallAssistantStrict: ipcCallAssistantStrictMock, IpcHandlerError: MockIpcHandlerError, IpcTransportError: MockIpcTransportError, })); @@ -196,10 +195,8 @@ describe("matchRoute", () => { describe("tryIpcProxy", () => { beforeEach(() => { - ipcCallAssistantStrictMock.mockReset(); - ipcCallAssistantStrictMock.mockImplementation(() => - Promise.resolve({ ok: true }), - ); + ipcCallAssistantMock.mockReset(); + ipcCallAssistantMock.mockImplementation(defaultIpcImpl); validateEdgeTokenMock.mockReset(); validateEdgeTokenMock.mockImplementation(() => ({ ok: true, @@ -241,8 +238,8 @@ describe("tryIpcProxy", () => { expect(result).not.toBeNull(); expect(result!.status).toBe(200); - expect(ipcCallAssistantStrictMock).toHaveBeenCalledTimes(1); - const [opId, params] = ipcCallAssistantStrictMock.mock.calls[0] as [ + expect(ipcCallAssistantMock).toHaveBeenCalledTimes(1); + const [opId, params] = ipcCallAssistantMock.mock.calls[0] as [ string, Record, ]; @@ -263,7 +260,7 @@ describe("tryIpcProxy", () => { await tryIpcProxy(req, makeConfig()); - const [, params] = ipcCallAssistantStrictMock.mock.calls[0] as [ + const [, params] = ipcCallAssistantMock.mock.calls[0] as [ string, Record, ]; @@ -306,7 +303,7 @@ describe("tryIpcProxy", () => { }); test("returns handler error status code from IpcHandlerError", async () => { - ipcCallAssistantStrictMock.mockImplementation(() => { + ipcCallAssistantMock.mockImplementation(() => { throw new MockIpcHandlerError("Not found", 404, "NOT_FOUND"); }); @@ -320,7 +317,7 @@ describe("tryIpcProxy", () => { }); test("returns 502 on transport error", async () => { - ipcCallAssistantStrictMock.mockImplementation(() => { + ipcCallAssistantMock.mockImplementation(() => { throw new MockIpcTransportError("Socket closed"); }); @@ -333,7 +330,7 @@ describe("tryIpcProxy", () => { const req = makeRequest("/v1/acp/sessions?limit=10&offset=5"); await tryIpcProxy(req, makeConfig()); - const [, params] = ipcCallAssistantStrictMock.mock.calls[0] as [ + const [, params] = ipcCallAssistantMock.mock.calls[0] as [ string, Record, ]; @@ -347,10 +344,8 @@ describe("tryIpcProxy", () => { describe("policy enforcement", () => { beforeEach(() => { - ipcCallAssistantStrictMock.mockReset(); - ipcCallAssistantStrictMock.mockImplementation(() => - Promise.resolve({ ok: true }), - ); + ipcCallAssistantMock.mockReset(); + ipcCallAssistantMock.mockImplementation(defaultIpcImpl); validateEdgeTokenMock.mockReset(); }); diff --git a/gateway/src/http/routes/ipc-runtime-proxy.ts b/gateway/src/http/routes/ipc-runtime-proxy.ts index 4a6eaf988e6..5236947c16c 100644 --- a/gateway/src/http/routes/ipc-runtime-proxy.ts +++ b/gateway/src/http/routes/ipc-runtime-proxy.ts @@ -22,9 +22,9 @@ import { validateEdgeToken } from "../../auth/token-exchange.js"; import type { TokenClaims } from "../../auth/types.js"; import type { GatewayConfig } from "../../config.js"; import { - ipcCallAssistantStrict, IpcHandlerError, IpcTransportError, + ipcCallAssistant, } from "../../ipc/assistant-client.js"; import { matchRoute } from "../../ipc/route-schema-cache.js"; import { getLogger } from "../../logger.js"; @@ -160,7 +160,7 @@ export async function tryIpcProxy( // --- Call daemon via IPC ------------------------------------------------ try { - const result = await ipcCallAssistantStrict(match.operationId, params); + const result = await ipcCallAssistant(match.operationId, params); const duration = Math.round(performance.now() - start); log.info( diff --git a/gateway/src/ipc/assistant-client.test.ts b/gateway/src/ipc/assistant-client.test.ts index 3eb86425143..abc4ff31144 100644 --- a/gateway/src/ipc/assistant-client.test.ts +++ b/gateway/src/ipc/assistant-client.test.ts @@ -14,7 +14,12 @@ import { tmpdir } from "node:os"; import { join } from "node:path"; import { afterEach, beforeEach, describe, expect, test } from "bun:test"; -import { ipcCallAssistant, ipcSuggestTrustRule } from "./assistant-client.js"; +import { + IpcHandlerError, + IpcTransportError, + ipcCallAssistant, + ipcSuggestTrustRule, +} from "./assistant-client.js"; // --------------------------------------------------------------------------- // Test infrastructure @@ -22,10 +27,16 @@ import { ipcCallAssistant, ipcSuggestTrustRule } from "./assistant-client.js"; let server: Server | undefined; let origWorkspaceDir: string | undefined; +let origAssistantIpcDir: string | undefined; -// Save and restore VELLUM_WORKSPACE_DIR around each test. +// Save and restore VELLUM_WORKSPACE_DIR + ASSISTANT_IPC_SOCKET_DIR around +// each test. The sandbox sets ASSISTANT_IPC_SOCKET_DIR, which would +// otherwise win over VELLUM_WORKSPACE_DIR in `resolveIpcSocketPath` and +// route requests to the real daemon socket instead of our test server. beforeEach(() => { origWorkspaceDir = process.env.VELLUM_WORKSPACE_DIR; + origAssistantIpcDir = process.env.ASSISTANT_IPC_SOCKET_DIR; + delete process.env.ASSISTANT_IPC_SOCKET_DIR; server = undefined; }); @@ -36,6 +47,12 @@ afterEach(async () => { delete process.env.VELLUM_WORKSPACE_DIR; } + if (origAssistantIpcDir !== undefined) { + process.env.ASSISTANT_IPC_SOCKET_DIR = origAssistantIpcDir; + } else { + delete process.env.ASSISTANT_IPC_SOCKET_DIR; + } + if (server) { await new Promise((resolve) => { server!.close(() => resolve()); @@ -72,6 +89,19 @@ function sendError(socket: Socket, id: string, error: string): void { socket.write(JSON.stringify({ id, error }) + "\n"); } +/** Send a handler-level error (with statusCode) over the socket. */ +function sendHandlerError( + socket: Socket, + id: string, + error: string, + statusCode: number, + errorCode: string, +): void { + socket.write( + JSON.stringify({ id, error, statusCode, errorCode }) + "\n", + ); +} + /** * Start an in-process NDJSON server that reads one request and calls * `handler` with the parsed method, params, and socket. @@ -131,14 +161,15 @@ describe("ipcCallAssistant", () => { expect(result).toEqual(expectedResult); }); - test("returns undefined when the socket does not exist", async () => { + test("throws IpcTransportError when the socket does not exist", async () => { setupWorkspace(); // No server started — socket file does not exist - const result = await ipcCallAssistant("test_method"); - expect(result).toBeUndefined(); + await expect(ipcCallAssistant("test_method")).rejects.toBeInstanceOf( + IpcTransportError, + ); }); - test("returns undefined when server returns an error field", async () => { + test("throws IpcTransportError when server returns an error without statusCode", async () => { const sockPath = setupWorkspace(); await startServer(sockPath, (id, _method, _params, socket) => { @@ -146,8 +177,29 @@ describe("ipcCallAssistant", () => { socket.end(); }); - const result = await ipcCallAssistant("failing_method"); - expect(result).toBeUndefined(); + await expect(ipcCallAssistant("failing_method")).rejects.toBeInstanceOf( + IpcTransportError, + ); + }); + + test("throws IpcHandlerError when server returns error with statusCode", async () => { + const sockPath = setupWorkspace(); + + await startServer(sockPath, (id, _method, _params, socket) => { + sendHandlerError(socket, id, "Not found", 404, "NOT_FOUND"); + socket.end(); + }); + + const promise = ipcCallAssistant("failing_method"); + await expect(promise).rejects.toBeInstanceOf(IpcHandlerError); + try { + await promise; + } catch (err) { + const handlerErr = err as IpcHandlerError; + expect(handlerErr.message).toBe("Not found"); + expect(handlerErr.statusCode).toBe(404); + expect(handlerErr.code).toBe("NOT_FOUND"); + } }); test("passes method and params to the server", async () => { @@ -227,7 +279,7 @@ describe("ipcSuggestTrustRule", () => { expect(receivedMethod).toBe("suggest_trust_rule"); }); - test("throws when the assistant returns an error field", async () => { + test("propagates IpcTransportError when the assistant returns an error field", async () => { const sockPath = setupWorkspace(); await startServer(sockPath, (id, _method, _params, socket) => { @@ -235,8 +287,8 @@ describe("ipcSuggestTrustRule", () => { socket.end(); }); - await expect(ipcSuggestTrustRule(validRequest)).rejects.toThrow( - "ipcSuggestTrustRule: unexpected response shape", + await expect(ipcSuggestTrustRule(validRequest)).rejects.toBeInstanceOf( + IpcTransportError, ); }); @@ -279,12 +331,12 @@ describe("ipcSuggestTrustRule", () => { ); }); - test("throws when the socket is unavailable (assistant not running)", async () => { + test("propagates IpcTransportError when the socket is unavailable", async () => { setupWorkspace(); - // No server — socket does not exist, ipcCallAssistant returns undefined + // No server — socket does not exist, ipcCallAssistant throws IpcTransportError. - await expect(ipcSuggestTrustRule(validRequest)).rejects.toThrow( - "ipcSuggestTrustRule: unexpected response shape", + await expect(ipcSuggestTrustRule(validRequest)).rejects.toBeInstanceOf( + IpcTransportError, ); }); }); diff --git a/gateway/src/ipc/assistant-client.ts b/gateway/src/ipc/assistant-client.ts index 9be827f1b2d..c3e98dd1599 100644 --- a/gateway/src/ipc/assistant-client.ts +++ b/gateway/src/ipc/assistant-client.ts @@ -15,7 +15,6 @@ import { connect, type Socket } from "node:net"; -import { getLogger } from "../logger.js"; import type { ScopeOption, DirectoryScopeOption } from "../risk/risk-types.js"; import { resolveIpcSocketPath } from "./socket-path.js"; @@ -49,8 +48,8 @@ interface IpcResponse { // --------------------------------------------------------------------------- /** - * Error thrown by {@link ipcCallAssistantStrict} when the daemon returns - * a handler-level error (e.g. a RouteError with statusCode). + * Error thrown by {@link ipcCallAssistant} when the daemon returns a + * handler-level error (e.g. a RouteError with statusCode). */ export class IpcHandlerError extends Error { readonly statusCode: number; @@ -65,8 +64,8 @@ export class IpcHandlerError extends Error { } /** - * Error thrown by {@link ipcCallAssistantStrict} when the daemon is - * unreachable (socket error, timeout, closed before response). + * Error thrown by {@link ipcCallAssistant} when the daemon is unreachable + * (socket error, timeout, closed before response). */ export class IpcTransportError extends Error { constructor(message: string) { @@ -87,124 +86,19 @@ function getAssistantSocketPath(): string { // One-shot IPC call to the assistant // --------------------------------------------------------------------------- -const log = getLogger("assistant-client"); - /** * One-shot IPC helper: connect to assistant.sock, call a method, disconnect. * - * Returns `undefined` on any failure (socket not found, timeout, parse error) - * so callers can fall back gracefully. Uses a 30-second call timeout to - * accommodate LLM latency on the assistant side. - */ -export async function ipcCallAssistant( - method: string, - params?: Record, -): Promise { - const socketPath = getAssistantSocketPath(); - - return new Promise((resolve) => { - let settled = false; - let callTimer: ReturnType | undefined; - - const finish = (value: unknown) => { - if (settled) return; - settled = true; - clearTimeout(connectTimer); - if (callTimer) clearTimeout(callTimer); - socket.destroy(); - resolve(value); - }; - - const connectTimer = setTimeout(() => { - log.warn( - { method, socketPath, timeoutMs: CONNECT_TIMEOUT_MS }, - "Assistant IPC connect timed out", - ); - finish(undefined); - }, CONNECT_TIMEOUT_MS); - - const socket: Socket = connect(socketPath); - socket.unref(); - - let buffer = ""; - const reqId = crypto.randomUUID(); - - socket.on("connect", () => { - clearTimeout(connectTimer); - const req: IpcRequest = { id: reqId, method, params }; - socket.write(JSON.stringify(req) + "\n"); - - callTimer = setTimeout(() => { - log.warn( - { method, socketPath, timeoutMs: CALL_TIMEOUT_MS }, - "Assistant IPC call timed out waiting for response", - ); - finish(undefined); - }, CALL_TIMEOUT_MS); - - socket.on("data", (chunk) => { - buffer += chunk.toString(); - let newlineIdx: number; - while ((newlineIdx = buffer.indexOf("\n")) !== -1) { - const line = buffer.slice(0, newlineIdx).trim(); - buffer = buffer.slice(newlineIdx + 1); - if (!line) continue; - - try { - const msg = JSON.parse(line) as IpcResponse; - if (msg.id === reqId) { - if (msg.error) { - log.warn( - { error: msg.error, method }, - "Assistant IPC call returned error", - ); - finish(undefined); - } else { - finish(msg.result); - } - return; - } - } catch { - // Ignore malformed lines - } - } - }); - }); - - socket.on("error", (err) => { - log.warn( - { - err: err instanceof Error ? err.message : String(err), - code: (err as NodeJS.ErrnoException).code ?? "unknown", - method, - socketPath, - }, - "Assistant IPC socket error", - ); - finish(undefined); - }); - - socket.on("close", () => { - if (!settled) { - log.warn( - { method, socketPath }, - "Assistant IPC socket closed before response", - ); - } - finish(undefined); - }); - }); -} - -/** - * Strict IPC call that distinguishes handler errors from transport failures. - * * - On success: resolves with the result value. - * - On handler error (RouteError): throws {@link IpcHandlerError} with - * statusCode and code. - * - On transport failure: throws {@link IpcTransportError}. + * - On handler error (assistant RouteError): throws {@link IpcHandlerError} + * with statusCode and code. + * - On transport failure (socket not found, timeout, parse error, closed + * before response): throws {@link IpcTransportError}. + * + * Uses a 30-second call timeout to accommodate LLM latency on the + * assistant side. */ -export async function ipcCallAssistantStrict( +export async function ipcCallAssistant( method: string, params?: Record, ): Promise { diff --git a/gateway/src/post-assistant-ready.ts b/gateway/src/post-assistant-ready.ts index 82355a4b953..ac6985c4533 100644 --- a/gateway/src/post-assistant-ready.ts +++ b/gateway/src/post-assistant-ready.ts @@ -18,7 +18,10 @@ import type { Database } from "bun:sqlite"; import { ensureVellumGuardianBinding } from "./auth/guardian-bootstrap.js"; import { getGatewayDb, type GatewayDb } from "./db/connection.js"; import { runDataMigrations } from "./db/data-migrations/index.js"; -import { ipcCallAssistant } from "./ipc/assistant-client.js"; +import { + IpcTransportError, + ipcCallAssistant, +} from "./ipc/assistant-client.js"; import { getLogger } from "./logger.js"; const log = getLogger("post-assistant-ready"); @@ -34,10 +37,13 @@ export async function waitForAssistant(): Promise { const deadline = Date.now() + MAX_WAIT_MS; while (Date.now() < deadline) { - const result = await ipcCallAssistant("health"); - if (result !== undefined) { + try { + await ipcCallAssistant("health"); log.info("Assistant is ready"); return true; + } catch (err) { + if (!(err instanceof IpcTransportError)) throw err; + // Transport error during startup is expected — keep polling. } await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS)); diff --git a/gateway/src/verification/voice-approval-sync.ts b/gateway/src/verification/voice-approval-sync.ts index f0d0f4d2aca..ad422244d10 100644 --- a/gateway/src/verification/voice-approval-sync.ts +++ b/gateway/src/verification/voice-approval-sync.ts @@ -73,9 +73,9 @@ async function syncVoiceApprovals(): Promise { AND updated_at > ?`, mode: "query", bind: [since], - })) as DbProxyResult | undefined; + })) as DbProxyResult; - if (!result?.rows?.length) { + if (!result.rows?.length) { lastSyncAt = now; return; }