diff --git a/assistant/Dockerfile b/assistant/Dockerfile index daeee187023..945b1c08329 100644 --- a/assistant/Dockerfile +++ b/assistant/Dockerfile @@ -22,6 +22,7 @@ COPY packages/service-contracts ./packages/service-contracts COPY packages/credential-storage ./packages/credential-storage COPY packages/egress-proxy ./packages/egress-proxy COPY packages/gateway-client ./packages/gateway-client +COPY packages/ipc-server-utils ./packages/ipc-server-utils COPY packages/skill-host-contracts ./packages/skill-host-contracts COPY packages/slack-text ./packages/slack-text COPY packages/twilio-client ./packages/twilio-client diff --git a/assistant/bun.lock b/assistant/bun.lock index b3211ddb440..8f77e16395e 100644 --- a/assistant/bun.lock +++ b/assistant/bun.lock @@ -18,6 +18,7 @@ "@vellumai/credential-storage": "file:../packages/credential-storage", "@vellumai/egress-proxy": "file:../packages/egress-proxy", "@vellumai/gateway-client": "file:../packages/gateway-client", + "@vellumai/ipc-server-utils": "file:../packages/ipc-server-utils", "@vellumai/service-contracts": "file:../packages/service-contracts", "@vellumai/skill-host-contracts": "file:../packages/skill-host-contracts", "@vellumai/slack-text": "file:../packages/slack-text", @@ -421,6 +422,8 @@ "@vellumai/gateway-client": ["@vellumai/gateway-client@file:../packages/gateway-client", { "dependencies": { "@vellumai/service-contracts": "file:../service-contracts" }, "devDependencies": { "@types/bun": "1.3.10", "typescript": "5.9.3" } }], + "@vellumai/ipc-server-utils": ["@vellumai/ipc-server-utils@file:../packages/ipc-server-utils", { "devDependencies": { "@types/bun": "1.3.10", "typescript": "5.9.3" } }], + "@vellumai/service-contracts": ["@vellumai/service-contracts@file:../packages/service-contracts", { "dependencies": { "zod": "4.3.6" }, "devDependencies": { "@types/bun": "1.2.4", "typescript": "5.7.3" } }], "@vellumai/skill-host-contracts": ["@vellumai/skill-host-contracts@file:../packages/skill-host-contracts", { "devDependencies": { "@types/bun": "1.3.10", "typescript": "5.9.3" } }], diff --git a/assistant/knip.json b/assistant/knip.json index 876c1002ee2..0763a1f1933 100644 --- a/assistant/knip.json +++ b/assistant/knip.json @@ -11,6 +11,7 @@ "@vellumai/credential-storage", "@vellumai/egress-proxy", "@vellumai/gateway-client", + "@vellumai/ipc-server-utils", "@vellumai/service-contracts", "@vellumai/slack-text", "@vellumai/twilio-client", diff --git a/assistant/package.json b/assistant/package.json index 962270fde8f..9821c2e77de 100644 --- a/assistant/package.json +++ b/assistant/package.json @@ -44,6 +44,7 @@ "@vellumai/credential-storage": "file:../packages/credential-storage", "@vellumai/egress-proxy": "file:../packages/egress-proxy", "@vellumai/gateway-client": "file:../packages/gateway-client", + "@vellumai/ipc-server-utils": "file:../packages/ipc-server-utils", "@vellumai/service-contracts": "file:../packages/service-contracts", "@vellumai/skill-host-contracts": "file:../packages/skill-host-contracts", "@vellumai/slack-text": "file:../packages/slack-text", @@ -78,6 +79,7 @@ "@vellumai/service-contracts", "@vellumai/egress-proxy", "@vellumai/gateway-client", + "@vellumai/ipc-server-utils", "@vellumai/skill-host-contracts", "@vellumai/slack-text", "@vellumai/twilio-client" diff --git a/assistant/src/ipc/assistant-server.ts b/assistant/src/ipc/assistant-server.ts index fde7d83f6a7..b641a337a30 100644 --- a/assistant/src/ipc/assistant-server.ts +++ b/assistant/src/ipc/assistant-server.ts @@ -28,9 +28,13 @@ * back to a shorter deterministic path so CLI commands can still connect. */ -import { existsSync, mkdirSync, unlinkSync } from "node:fs"; +import { existsSync, unlinkSync } from "node:fs"; import { createServer, type Server, type Socket } from "node:net"; -import { dirname } from "node:path"; + +import { + ensureSocketDir, + SocketWatchdog, +} from "@vellumai/ipc-server-utils"; import { findLocalGuardianPrincipalId } from "../runtime/local-actor-identity.js"; import { RouteError } from "../runtime/routes/errors.js"; @@ -130,13 +134,29 @@ function isIpcBinaryResponse(value: unknown): value is IpcBinaryResponse { // Server // --------------------------------------------------------------------------- +/** Optional configuration for {@link AssistantIpcServer}. */ +export interface AssistantIpcServerOptions { + /** + * How often the socket-file watchdog stats the listening socket path. + * Set to `0` to disable. Defaults to {@link SocketWatchdog}'s 5000ms. + */ + watchdogIntervalMs?: number; +} + export class AssistantIpcServer { private server: Server | null = null; private clients = new Set(); private methods = new Map(); private socketPath: string; + private watchdog: SocketWatchdog; + /** + * Servers whose listener path has been replaced by a re-bind. Kept around + * so already-connected sockets continue to work; closed gracefully once + * their accept loops drain. + */ + private legacyServers = new Set(); - constructor() { + constructor(options?: AssistantIpcServerOptions) { const resolution = resolveIpcSocketPath("assistant"); this.socketPath = resolution.path; log.info( @@ -154,62 +174,55 @@ export class AssistantIpcServer { this.methods.set("db_proxy", (params) => handleDbProxy(params as unknown as DbProxyParams), ); + + this.watchdog = new SocketWatchdog({ + socketPath: this.socketPath, + intervalMs: options?.watchdogIntervalMs, + getServer: () => this.server, + createServer: () => this.createListeningServer(), + onRebind: (newServer, oldServer) => { + this.server = newServer; + this.legacyServers.add(oldServer); + oldServer.close(() => { + this.legacyServers.delete(oldServer); + }); + }, + log, + }); } /** Start listening on the Unix domain socket. */ async start(): Promise { // Ensure the parent directory exists before listening. - const socketDir = dirname(this.socketPath); - if (!existsSync(socketDir)) { - mkdirSync(socketDir, { recursive: true, mode: 0o700 }); - } + ensureSocketDir(this.socketPath); // Probe before unlink so a second daemon can't silently orphan an active // listener (Unix lets you unlink a still-bound socket file). See // `ensureSocketPathFree` for the behavior matrix. await ensureSocketPathFree(this.socketPath); - this.server = createServer((socket) => { - this.clients.add(socket); - log.debug("IPC client connected"); - - const reader = new IpcFrameReader( - (envelope, binary) => - this.handleEnvelope(socket, reader, envelope, binary), - (err) => log.warn({ err }, "IPC frame read error"), - ); - - socket.on("data", (chunk) => { - reader.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); - }); - - socket.on("close", () => { - this.clients.delete(socket); - log.debug("IPC client disconnected"); - }); - - socket.on("error", (err) => { - log.warn({ err }, "IPC client socket error"); - this.clients.delete(socket); - }); - }); - - this.server.on("error", (err) => { - log.error({ err }, "Assistant IPC server error"); - }); - + this.server = this.createListeningServer(); this.server.listen(this.socketPath, () => { log.info({ path: this.socketPath }, "Assistant IPC server listening"); }); + + this.watchdog.start(); } /** Stop the server and disconnect all clients. */ stop(): void { + this.watchdog.stop(); + for (const client of this.clients) { if (!client.destroyed) client.destroy(); } this.clients.clear(); + for (const legacy of this.legacyServers) { + legacy.close(); + } + this.legacyServers.clear(); + if (this.server) { this.server.close(); this.server = null; @@ -229,8 +242,52 @@ export class AssistantIpcServer { return this.socketPath; } + /** + * Re-bind the listening socket if its path entry is missing on disk. + * + * Public for tests so the watchdog can be exercised deterministically + * without waiting for the interval. Returns `true` when a re-bind was + * performed, `false` otherwise. + */ + async rebindIfMissing(): Promise { + return this.watchdog.rebindIfMissing(); + } + // ── Internal ────────────────────────────────────────────────────────── + private createListeningServer(): Server { + const server = createServer((socket) => { + this.clients.add(socket); + log.debug("IPC client connected"); + + const reader = new IpcFrameReader( + (envelope, binary) => + this.handleEnvelope(socket, reader, envelope, binary), + (err) => log.warn({ err }, "IPC frame read error"), + ); + + socket.on("data", (chunk) => { + reader.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + }); + + socket.on("close", () => { + this.clients.delete(socket); + log.debug("IPC client disconnected"); + }); + + socket.on("error", (err) => { + log.warn({ err }, "IPC client socket error"); + this.clients.delete(socket); + }); + }); + + server.on("error", (err) => { + log.error({ err }, "Assistant IPC server error"); + }); + + return server; + } + private handleEnvelope( socket: Socket, reader: IpcFrameReader, diff --git a/assistant/src/ipc/skill-server.ts b/assistant/src/ipc/skill-server.ts index 69ec5818039..b390622cb19 100644 --- a/assistant/src/ipc/skill-server.ts +++ b/assistant/src/ipc/skill-server.ts @@ -37,9 +37,13 @@ * back to a shorter deterministic path via the shared socket-path resolver. */ -import { existsSync, mkdirSync, unlinkSync } from "node:fs"; +import { existsSync, unlinkSync } from "node:fs"; import { createServer, type Server, type Socket } from "node:net"; -import { dirname } from "node:path"; + +import { + ensureSocketDir, + SocketWatchdog, +} from "@vellumai/ipc-server-utils"; import { type SkillRouteHandle, @@ -233,6 +237,15 @@ class SkillIpcConnectionState implements SkillIpcConnection { // Server // --------------------------------------------------------------------------- +/** Optional configuration for {@link SkillIpcServer}. */ +export interface SkillIpcServerOptions { + /** + * How often the socket-file watchdog stats the listening socket path. + * Set to `0` to disable. Defaults to {@link SocketWatchdog}'s 5000ms. + */ + watchdogIntervalMs?: number; +} + export class SkillIpcServer { private server: Server | null = null; private clients = new Set(); @@ -252,8 +265,15 @@ export class SkillIpcServer { private connections = new WeakMap(); private nextConnectionId = 1; private socketPath: string; + private watchdog: SocketWatchdog; + /** + * Servers whose listener path has been replaced by a re-bind. Kept around + * so already-connected sockets continue to work; closed gracefully once + * their accept loops drain. + */ + private legacyServers = new Set(); - constructor() { + constructor(options?: SkillIpcServerOptions) { const resolution = resolveSkillIpcSocketPath(); this.socketPath = resolution.path; log.info( @@ -266,6 +286,21 @@ export class SkillIpcServer { for (const route of skillIpcStreamingRoutes) { this.streamingMethods.set(route.method, route.handler); } + + this.watchdog = new SocketWatchdog({ + socketPath: this.socketPath, + intervalMs: options?.watchdogIntervalMs, + getServer: () => this.server, + createServer: () => this.createListeningServer(), + onRebind: (newServer, oldServer) => { + this.server = newServer; + this.legacyServers.add(oldServer); + oldServer.close(() => { + this.legacyServers.delete(oldServer); + }); + }, + log, + }); } /** Register an additional method handler after construction. */ @@ -349,17 +384,71 @@ export class SkillIpcServer { /** Start listening on the Unix domain socket. */ async start(): Promise { // Ensure the parent directory exists before listening. - const socketDir = dirname(this.socketPath); - if (!existsSync(socketDir)) { - mkdirSync(socketDir, { recursive: true, mode: 0o700 }); - } + ensureSocketDir(this.socketPath); // Probe before unlink so a second daemon can't silently orphan an active // listener (Unix lets you unlink a still-bound socket file). See // `ensureSocketPathFree` for the behavior matrix. await ensureSocketPathFree(this.socketPath); - this.server = createServer((socket) => { + this.server = this.createListeningServer(); + this.server.listen(this.socketPath, () => { + log.info({ path: this.socketPath }, "Skill IPC server listening"); + }); + + this.watchdog.start(); + } + + /** Stop the server and disconnect all clients. */ + stop(): void { + this.watchdog.stop(); + + for (const client of this.clients) { + this.teardownSubscriptions(client); + this.teardownConnection(client); + if (!client.destroyed) client.destroy(); + } + this.clients.clear(); + + for (const legacy of this.legacyServers) { + legacy.close(); + } + this.legacyServers.clear(); + + if (this.server) { + this.server.close(); + this.server = null; + } + + if (existsSync(this.socketPath)) { + try { + unlinkSync(this.socketPath); + } catch { + // Ignore + } + } + } + + /** Get the socket path (for diagnostics). */ + getSocketPath(): string { + return this.socketPath; + } + + /** + * Re-bind the listening socket if its path entry is missing on disk. + * + * Public for tests so the watchdog can be exercised deterministically + * without waiting for the interval. Returns `true` when a re-bind was + * performed, `false` otherwise. + */ + async rebindIfMissing(): Promise { + return this.watchdog.rebindIfMissing(); + } + + // ── Internal ────────────────────────────────────────────────────────── + + private createListeningServer(): Server { + const server = createServer((socket) => { this.clients.add(socket); const connection = new SkillIpcConnectionState( `skill-ipc-${this.nextConnectionId++}`, @@ -406,45 +495,13 @@ export class SkillIpcServer { }); }); - this.server.on("error", (err) => { + server.on("error", (err) => { log.error({ err }, "Skill IPC server error"); }); - this.server.listen(this.socketPath, () => { - log.info({ path: this.socketPath }, "Skill IPC server listening"); - }); - } - - /** Stop the server and disconnect all clients. */ - stop(): void { - for (const client of this.clients) { - this.teardownSubscriptions(client); - this.teardownConnection(client); - if (!client.destroyed) client.destroy(); - } - this.clients.clear(); - - if (this.server) { - this.server.close(); - this.server = null; - } - - if (existsSync(this.socketPath)) { - try { - unlinkSync(this.socketPath); - } catch { - // Ignore - } - } - } - - /** Get the socket path (for diagnostics). */ - getSocketPath(): string { - return this.socketPath; + return server; } - // ── Internal ────────────────────────────────────────────────────────── - private handleMessage(socket: Socket, line: string): void { let frame: IpcRequest & { result?: unknown; error?: string }; try { diff --git a/gateway/Dockerfile b/gateway/Dockerfile index 538fa8a7565..e2bcc1e342c 100644 --- a/gateway/Dockerfile +++ b/gateway/Dockerfile @@ -11,6 +11,7 @@ COPY --from=bun /usr/local/bin/bun /usr/local/bin/bun # Copy shared packages needed by gateway's repo-local dependencies COPY packages/assistant-client ./packages/assistant-client COPY packages/ces-client ./packages/ces-client +COPY packages/ipc-server-utils ./packages/ipc-server-utils COPY packages/service-contracts ./packages/service-contracts COPY packages/slack-text ./packages/slack-text COPY packages/twilio-client ./packages/twilio-client diff --git a/gateway/bun.lock b/gateway/bun.lock index 866381fddaf..201af9b8144 100644 --- a/gateway/bun.lock +++ b/gateway/bun.lock @@ -7,6 +7,7 @@ "dependencies": { "@vellumai/assistant-client": "file:../packages/assistant-client", "@vellumai/ces-client": "file:../packages/ces-client", + "@vellumai/ipc-server-utils": "file:../packages/ipc-server-utils", "@vellumai/service-contracts": "file:../packages/service-contracts", "@vellumai/slack-text": "file:../packages/slack-text", "@vellumai/twilio-client": "file:../packages/twilio-client", @@ -209,6 +210,8 @@ "@vellumai/ces-client": ["@vellumai/ces-client@file:../packages/ces-client", { "dependencies": { "@vellumai/service-contracts": "file:../service-contracts" }, "devDependencies": { "@types/bun": "1.2.4", "typescript": "5.7.3" } }], + "@vellumai/ipc-server-utils": ["@vellumai/ipc-server-utils@file:../packages/ipc-server-utils", { "devDependencies": { "@types/bun": "1.3.10", "typescript": "5.9.3" } }], + "@vellumai/service-contracts": ["@vellumai/service-contracts@file:../packages/service-contracts", { "dependencies": { "zod": "4.3.6" }, "devDependencies": { "@types/bun": "1.2.4", "typescript": "5.7.3" } }], "@vellumai/slack-text": ["@vellumai/slack-text@file:../packages/slack-text", { "devDependencies": { "@types/bun": "1.3.10", "typescript": "5.9.3" } }], @@ -497,10 +500,12 @@ "@vellumai/ces-client/@types/bun": ["@types/bun@1.2.4", "", { "dependencies": { "bun-types": "1.2.4" } }, "sha512-QtuV5OMR8/rdKJs213iwXDpfVvnskPXY/S0ZiFbsTjQZycuqPbMW8Gf/XhLfwE5njW8sxI2WjISURXPlHypMFA=="], - "@vellumai/ces-client/@vellumai/service-contracts": ["@vellumai/service-contracts@file:../packages/service-contracts", {}], + "@vellumai/ces-client/@vellumai/service-contracts": ["@vellumai/service-contracts@file:../packages/service-contracts", { "dependencies": { "zod": "4.3.6" }, "devDependencies": { "@types/bun": "1.2.4", "typescript": "5.7.3" } }], "@vellumai/ces-client/typescript": ["typescript@5.7.3", "", { "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" } }, "sha512-84MVSjMEHP+FQRPy3pX9sTVV/INIex71s9TL2Gm5FG/WG1SqXeKyZ0k7/blY/4FdOzI12CBy1vGc4og/eus0fw=="], + "@vellumai/ipc-server-utils/@types/bun": ["@types/bun@1.3.10", "", { "dependencies": { "bun-types": "1.3.10" } }, "sha512-0+rlrUrOrTSskibryHbvQkDOWRJwJZqZlxrUs1u4oOoTln8+WIXBPmAuCF35SWB2z4Zl3E84Nl/D0P7803nigQ=="], + "@vellumai/service-contracts/@types/bun": ["@types/bun@1.2.4", "", { "dependencies": { "bun-types": "1.2.4" } }, "sha512-QtuV5OMR8/rdKJs213iwXDpfVvnskPXY/S0ZiFbsTjQZycuqPbMW8Gf/XhLfwE5njW8sxI2WjISURXPlHypMFA=="], "@vellumai/service-contracts/typescript": ["typescript@5.7.3", "", { "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" } }, "sha512-84MVSjMEHP+FQRPy3pX9sTVV/INIex71s9TL2Gm5FG/WG1SqXeKyZ0k7/blY/4FdOzI12CBy1vGc4og/eus0fw=="], @@ -567,6 +572,8 @@ "@vellumai/ces-client/@types/bun/bun-types": ["bun-types@1.2.4", "", { "dependencies": { "@types/node": "*", "@types/ws": "~8.5.10" } }, "sha512-nDPymR207ZZEoWD4AavvEaa/KZe/qlrbMSchqpQwovPZCKc7pwMoENjEtHgMKaAjJhy+x6vfqSBA1QU3bJgs0Q=="], + "@vellumai/ipc-server-utils/@types/bun/bun-types": ["bun-types@1.3.10", "", { "dependencies": { "@types/node": "*" } }, "sha512-tcpfCCl6XWo6nCVnpcVrxQ+9AYN1iqMIzgrSKYMB/fjLtV2eyAVEg7AxQJuCq/26R6HpKWykQXuSOq/21RYcbg=="], + "@vellumai/service-contracts/@types/bun/bun-types": ["bun-types@1.2.4", "", { "dependencies": { "@types/node": "*", "@types/ws": "~8.5.10" } }, "sha512-nDPymR207ZZEoWD4AavvEaa/KZe/qlrbMSchqpQwovPZCKc7pwMoENjEtHgMKaAjJhy+x6vfqSBA1QU3bJgs0Q=="], "@vellumai/slack-text/@types/bun/bun-types": ["bun-types@1.3.10", "", { "dependencies": { "@types/node": "*" } }, "sha512-tcpfCCl6XWo6nCVnpcVrxQ+9AYN1iqMIzgrSKYMB/fjLtV2eyAVEg7AxQJuCq/26R6HpKWykQXuSOq/21RYcbg=="], diff --git a/gateway/knip.json b/gateway/knip.json index 6bd75be0cc0..fb4cfff1db6 100644 --- a/gateway/knip.json +++ b/gateway/knip.json @@ -4,6 +4,7 @@ "ignoreDependencies": [ "@vellumai/assistant-client", "@vellumai/ces-client", + "@vellumai/ipc-server-utils", "@vellumai/service-contracts", "@vellumai/slack-text", "@vellumai/twilio-client" diff --git a/gateway/package.json b/gateway/package.json index 7507b864ee6..525c74a8143 100644 --- a/gateway/package.json +++ b/gateway/package.json @@ -26,6 +26,7 @@ "dependencies": { "@vellumai/assistant-client": "file:../packages/assistant-client", "@vellumai/ces-client": "file:../packages/ces-client", + "@vellumai/ipc-server-utils": "file:../packages/ipc-server-utils", "@vellumai/service-contracts": "file:../packages/service-contracts", "@vellumai/slack-text": "file:../packages/slack-text", "@vellumai/twilio-client": "file:../packages/twilio-client", diff --git a/gateway/src/__tests__/ipc-server-watchdog.test.ts b/gateway/src/__tests__/ipc-server-watchdog.test.ts index 7d20672a7ea..f7558050750 100644 --- a/gateway/src/__tests__/ipc-server-watchdog.test.ts +++ b/gateway/src/__tests__/ipc-server-watchdog.test.ts @@ -9,6 +9,12 @@ import "./test-preload.js"; import { GatewayIpcServer, type IpcRoute } from "../ipc/server.js"; +// Integration tests for GatewayIpcServer's watchdog wiring. The watchdog's +// own unit tests (race guards, timer error handling, etc.) live in +// `@vellumai/ipc-server-utils`. These tests verify that the gateway server +// correctly wires the watchdog into its own lifecycle and legacy-server +// bookkeeping. + // macOS caps Unix socket paths at sizeof(sun_path)-1 == 103 chars, so the // shared test-preload temp dir is too long. Mint our own short path under // the system tmpdir for this test. @@ -69,12 +75,24 @@ const echoRoute: IpcRoute = { * resolves the path via env-var defaults that may not point at our temp * dir, so we override the private `socketPath` field directly — same * pattern used by `ipc-server-multi-client.test.ts`. + * + * Note: the watchdog is constructed in the GatewayIpcServer constructor + * and captures the original (unmocked) socketPath via closure. Tests that + * exercise the watchdog must therefore disable the timer-driven path and + * use the public `rebindIfMissing()` entry point, which reads + * `this.socketPath` lazily through the watchdog's `socketPath` capture — + * which we also need to monkeypatch. See {@link buildServer}. */ function buildServer(opts: { watchdogIntervalMs: number }): GatewayIpcServer { const server = new GatewayIpcServer([echoRoute], { watchdogIntervalMs: opts.watchdogIntervalMs, }); + // The watchdog captures socketPath in its constructor, so override both + // the public field (for start()/stop()) and the watchdog's private copy. (server as unknown as { socketPath: string }).socketPath = socketPath; + const watchdog = (server as unknown as { watchdog: { socketPath: string } }) + .watchdog; + watchdog.socketPath = socketPath; return server; } @@ -88,7 +106,7 @@ async function waitForListening(path: string, timeoutMs = 1000): Promise { } } -describe("GatewayIpcServer socket-file watchdog", () => { +describe("GatewayIpcServer watchdog wiring", () => { let server: GatewayIpcServer | undefined; const sockets: Socket[] = []; @@ -116,189 +134,56 @@ describe("GatewayIpcServer socket-file watchdog", () => { } }); - test("rebindIfMissing is a no-op when the socket path exists", async () => { + test("rebindIfMissing restores the listener and accepts new clients end-to-end", async () => { server = buildServer({ watchdogIntervalMs: 0 }); server.start(); await waitForListening(socketPath); - expect(existsSync(socketPath)).toBe(true); - const rebound = await server.rebindIfMissing(); - expect(rebound).toBe(false); - expect(existsSync(socketPath)).toBe(true); - }); - - test("rebindIfMissing is a no-op when the server has not been started", async () => { - server = buildServer({ watchdogIntervalMs: 0 }); - const rebound = await server.rebindIfMissing(); - expect(rebound).toBe(false); - }); - - test("rebindIfMissing recreates the path entry after an external unlink", async () => { - server = buildServer({ watchdogIntervalMs: 0 }); - server.start(); - await waitForListening(socketPath); - expect(existsSync(socketPath)).toBe(true); + // A baseline client confirms the initial listener is healthy. + const baseline = await connectClient(socketPath); + sockets.push(baseline); + const baselineEcho = await sendRequest(baseline, "echo", { value: "pre" }); + expect(baselineEcho.result).toEqual({ echoed: "pre" }); + // Simulate the cleanup that wipes /run/* — unlink the socket file + // while the listening fd is still alive in the kernel. unlinkSync(socketPath); expect(existsSync(socketPath)).toBe(false); const rebound = await server.rebindIfMissing(); expect(rebound).toBe(true); expect(existsSync(socketPath)).toBe(true); - }); - - test("a fresh client can connect and call a method after re-bind", async () => { - server = buildServer({ watchdogIntervalMs: 0 }); - server.start(); - await waitForListening(socketPath); - - unlinkSync(socketPath); - await server.rebindIfMissing(); - expect(existsSync(socketPath)).toBe(true); - - const client = await connectClient(socketPath); - sockets.push(client); - - const response = await sendRequest(client, "echo", { value: "after-rebind" }); - expect(response.error).toBeUndefined(); - expect(response.result).toEqual({ echoed: "after-rebind" }); - }); - - test("a client connected before the unlink can still send and receive", async () => { - server = buildServer({ watchdogIntervalMs: 0 }); - server.start(); - await waitForListening(socketPath); - - const persistent = await connectClient(socketPath); - sockets.push(persistent); - - // Round-trip once before the disruption to confirm the connection is good. - const before = await sendRequest(persistent, "echo", { value: "before" }); - expect(before.result).toEqual({ echoed: "before" }); - - unlinkSync(socketPath); - await server.rebindIfMissing(); - - // The kernel keeps the existing connection alive even though the - // listener path was replaced; in-flight RPCs continue to work because - // they ride the same already-connected socket. - const after = await sendRequest(persistent, "echo", { value: "after" }); - expect(after.error).toBeUndefined(); - expect(after.result).toEqual({ echoed: "after" }); - }); - - test("the periodic watchdog re-binds without manual intervention", async () => { - server = buildServer({ watchdogIntervalMs: 25 }); - server.start(); - await waitForListening(socketPath); - - unlinkSync(socketPath); - expect(existsSync(socketPath)).toBe(false); - - // Wait up to 1s for the watchdog to notice and re-bind. - const deadline = Date.now() + 1000; - while (!existsSync(socketPath) && Date.now() < deadline) { - await new Promise((r) => setTimeout(r, 25)); - } - expect(existsSync(socketPath)).toBe(true); - // And it should be a healthy listener — verify with a fresh client. - const client = await connectClient(socketPath); - sockets.push(client); - const response = await sendRequest(client, "echo", { value: "watchdog" }); - expect(response.result).toEqual({ echoed: "watchdog" }); + // A new client can connect to the re-bound listener and exercise the + // route table — proving onRebind correctly installed the new server + // as the primary. + const fresh = await connectClient(socketPath); + sockets.push(fresh); + const freshEcho = await sendRequest(fresh, "echo", { value: "post" }); + expect(freshEcho.result).toEqual({ echoed: "post" }); + + // The pre-existing client survives the rebind because its connected + // socket inode lives independently of the listener path. + expect(baseline.destroyed).toBe(false); }); - test("stop() cancels the watchdog timer and cleans up the path", async () => { - server = buildServer({ watchdogIntervalMs: 25 }); + test("stop() halts the watchdog so a later unlink does not resurrect the listener", async () => { + server = buildServer({ watchdogIntervalMs: 10 }); server.start(); await waitForListening(socketPath); server.stop(); - server = undefined; - - // After stop, the path is cleaned up. If the timer were still alive it - // would log "missing on disk" warnings indefinitely; verify no fresh - // socket file appears after some idle time. - await new Promise((r) => setTimeout(r, 100)); expect(existsSync(socketPath)).toBe(false); - }); - - test("rebindIfMissing aborts cleanly when shutdown happens mid-listen", async () => { - server = buildServer({ watchdogIntervalMs: 0 }); - server.start(); - await waitForListening(socketPath); - - // Trigger the path-missing condition so the rebind actually engages. - unlinkSync(socketPath); - - // Async functions run synchronously up to the first await — this - // returns to us with newServer.listen() in flight. - const inFlight = server.rebindIfMissing(); - // Simulate stop() racing the rebind: just clear the live server - // pointer. We can't call full stop() here because we want to observe - // exactly the post-listen race-guard branch and not a server already - // closed by the time listen resolves; clearing the field is the same - // signal stop() raises (see stop()'s `this.server = null`). - (server as unknown as { server: null }).server = null; - - const result = await inFlight; - expect(result).toBe(false); - - // The discarded newServer should have been closed AND its path - // unlinked, so we don't leak a stale listener after "shutdown". + // Even if something recreated and removed the path again, the watchdog + // has been stopped and rebindIfMissing returns false because the + // server reference was nulled. + const rebound = await server.rebindIfMissing(); + expect(rebound).toBe(false); expect(existsSync(socketPath)).toBe(false); - // The race guard must NOT have resurrected the listener. - expect( - (server as unknown as { server: unknown }).server, - ).toBeNull(); - - // Manual cleanup — afterEach will try to call stop() on a server - // that's already in a partially-broken state, which is fine because - // stop() is null-safe on this.server. - }); - - test("watchdog timer catches synchronous rebind errors so unhandled rejections don't escape", async () => { - server = buildServer({ watchdogIntervalMs: 25 }); - server.start(); - await waitForListening(socketPath); - - const unhandledRejections: unknown[] = []; - const onUnhandled = (err: unknown) => { - unhandledRejections.push(err); - }; - process.on("unhandledRejection", onUnhandled); - - try { - // Force rebindIfMissing to throw synchronously by replacing - // ensureSocketDir on the live instance — simulates mkdirSync - // failing (e.g. EACCES on a read-only fs). - let throwCount = 0; - ( - server as unknown as { ensureSocketDir: () => void } - ).ensureSocketDir = () => { - throwCount++; - throw new Error("simulated mkdirSync failure"); - }; - - // Trigger the path-missing condition so each tick engages the - // throwing code path. - unlinkSync(socketPath); - - // Wait for several watchdog ticks (~5 ticks at 25ms = 125ms). - await new Promise((r) => setTimeout(r, 200)); - - // The timer must have fired multiple times — proving the - // rejection didn't kill it. - expect(throwCount).toBeGreaterThanOrEqual(2); - - // No unhandled rejections must have escaped the catch() - // wrapper in start()'s setInterval. - expect(unhandledRejections).toEqual([]); - } finally { - process.off("unhandledRejection", onUnhandled); - } + // Wait past several timer ticks to confirm no background rebind fires. + await new Promise((r) => setTimeout(r, 50)); + expect(existsSync(socketPath)).toBe(false); }); }); diff --git a/gateway/src/ipc/server.ts b/gateway/src/ipc/server.ts index bf9378f408e..66464996d00 100644 --- a/gateway/src/ipc/server.ts +++ b/gateway/src/ipc/server.ts @@ -11,16 +11,19 @@ * volume. On platforms with strict AF_UNIX path limits, the server falls back * to a shorter deterministic path. * - * Resilience: the server runs a watchdog timer that re-binds the listening - * socket when its on-disk path entry has been removed (e.g. by a tmpfs sweep - * or rogue cleanup of `/run/*`). Existing connected sockets survive the - * re-bind because the kernel keeps connection inodes alive independently of - * the listener path; only new `connect()` calls require the path to exist. + * Resilience: a {@link SocketWatchdog} re-binds the listening socket when its + * on-disk path entry is removed (e.g. by a tmpfs sweep or rogue cleanup of + * `/run/*`). Existing connected sockets survive the re-bind because the + * kernel keeps connection inodes alive independently of the listener path; + * only new `connect()` calls require the path to exist. */ -import { existsSync, mkdirSync, unlinkSync } from "node:fs"; +import { + SocketWatchdog, + ensureSocketDir, +} from "@vellumai/ipc-server-utils"; +import { existsSync, unlinkSync } from "node:fs"; import { createServer, type Server, type Socket } from "node:net"; -import { dirname } from "node:path"; import type { z } from "zod"; @@ -64,16 +67,12 @@ export type IpcRoute = { /** Optional configuration for {@link GatewayIpcServer}. */ export interface GatewayIpcServerOptions { /** - * How often to check whether the listening socket path still exists on - * disk. When the path has been removed (tmpfs sweep, manual `rm`, etc.) - * the server re-binds atomically. Set to `0` to disable. Defaults to - * 5000ms. + * How often the socket-file watchdog stats the listening socket path. + * Set to `0` to disable. Defaults to {@link SocketWatchdog}'s 5000ms. */ watchdogIntervalMs?: number; } -const DEFAULT_WATCHDOG_INTERVAL_MS = 5000; - // --------------------------------------------------------------------------- // Server // --------------------------------------------------------------------------- @@ -84,21 +83,17 @@ export class GatewayIpcServer { private methods = new Map(); private schemas = new Map(); private socketPath: string; - private watchdogIntervalMs: number; - private watchdogHandle: ReturnType | null = null; + private watchdog: SocketWatchdog; /** * Servers whose listener path has been replaced by a re-bind. Kept around - * so that already-connected sockets continue to work; closed once their - * accept loops shut down (which happens immediately because the path no - * longer routes new connects to them). + * so that already-connected sockets continue to work; closed gracefully + * once their accept loops drain. */ private legacyServers = new Set(); constructor(routes?: IpcRoute[], options?: GatewayIpcServerOptions) { const resolution = resolveIpcSocketPath("gateway"); this.socketPath = resolution.path; - this.watchdogIntervalMs = - options?.watchdogIntervalMs ?? DEFAULT_WATCHDOG_INTERVAL_MS; log.info( { source: resolution.source, path: resolution.path }, "Gateway IPC socket path resolved", @@ -111,13 +106,33 @@ export class GatewayIpcServer { } } } + + this.watchdog = new SocketWatchdog({ + socketPath: this.socketPath, + intervalMs: options?.watchdogIntervalMs, + getServer: () => this.server, + createServer: () => this.createListeningServer(), + onRebind: (newServer, oldServer) => { + this.server = newServer; + // Move the previous listener into the legacy set so already- + // connected clients keep their accept loop alive. close() stops + // accepting new connections (which the kernel already won't route + // here anyway after the path moved) but lets in-flight sockets + // drain. + this.legacyServers.add(oldServer); + oldServer.close(() => { + this.legacyServers.delete(oldServer); + }); + }, + log, + }); } /** Start listening on the Unix domain socket. */ start(): void { // Ensure the parent directory exists — on a fresh hatch the workspace // dir may not have been created yet when the IPC server starts. - this.ensureSocketDir(); + ensureSocketDir(this.socketPath); // Clean up stale socket file from a previous run if (existsSync(this.socketPath)) { @@ -133,29 +148,12 @@ export class GatewayIpcServer { log.info({ path: this.socketPath }, "IPC server listening"); }); - if (this.watchdogIntervalMs > 0 && this.watchdogHandle === null) { - this.watchdogHandle = setInterval(() => { - // Catch synchronous throws from the entry path of rebindIfMissing - // (e.g. ensureSocketDir → mkdirSync EACCES) so the timer doesn't - // spew unhandled-rejection noise every 5s on a read-only fs. - this.rebindIfMissing().catch((err) => { - log.error( - { err, path: this.socketPath }, - "Watchdog rebind failed unexpectedly", - ); - }); - }, this.watchdogIntervalMs); - // Don't keep the event loop alive just for this watchdog. - this.watchdogHandle.unref?.(); - } + this.watchdog.start(); } /** Stop the server and disconnect all clients. */ stop(): void { - if (this.watchdogHandle !== null) { - clearInterval(this.watchdogHandle); - this.watchdogHandle = null; - } + this.watchdog.stop(); for (const socket of this.clients) { if (!socket.destroyed) { @@ -205,105 +203,14 @@ export class GatewayIpcServer { * * Public for tests so the watchdog can be exercised deterministically * without waiting for the interval. Returns `true` when a re-bind was - * performed, `false` when the socket was already healthy or the server - * is not running. + * performed, `false` otherwise. */ async rebindIfMissing(): Promise { - if (this.server === null) return false; - if (existsSync(this.socketPath)) return false; - - // Snapshot the current listener so we can detect a generation change - // (stop()/restart/concurrent rebind) after the async listen() resolves. - const initialServer = this.server; - - log.warn( - { path: this.socketPath }, - "IPC socket path missing on disk — re-binding listener", - ); - - this.ensureSocketDir(); - - const newServer = this.createListeningServer(); - try { - await new Promise((resolve, reject) => { - const onError = (err: unknown) => { - newServer.off("listening", onListening); - reject(err); - }; - const onListening = () => { - newServer.off("error", onError); - resolve(); - }; - newServer.once("error", onError); - newServer.once("listening", onListening); - newServer.listen(this.socketPath); - }); - } catch (err) { - log.error( - { err, path: this.socketPath }, - "Failed to re-bind IPC socket — will retry on next watchdog tick", - ); - // Best-effort cleanup of the half-initialized server. - try { - newServer.close(); - } catch { - /* ignore */ - } - return false; - } - - // Race guard: while we were awaiting listen(), stop() may have - // cleared this.server, or some other path may have replaced it. - // Installing newServer now would resurrect the listener after - // shutdown (keeping the process alive and accepting IPC again). - // Discard the new server instead. - if (this.server !== initialServer) { - try { - newServer.close(); - } catch { - /* ignore */ - } - // newServer.listen() recreated the path on disk; stop() may have - // already unlinked it, but if our listen won the race the file - // is sitting there — clean it up so it doesn't shadow a future - // start(). - if (existsSync(this.socketPath)) { - try { - unlinkSync(this.socketPath); - } catch { - /* ignore */ - } - } - log.warn( - { path: this.socketPath }, - "IPC server state changed during rebind — discarded new listener", - ); - return false; - } - - // Move the previous listener into the legacy set so already-connected - // clients keep their accept loop alive. Close it gracefully — `close()` - // stops accepting new connections (which the kernel already won't route - // here anyway after the path moved) but lets in-flight sockets drain. - this.server = newServer; - this.legacyServers.add(initialServer); - initialServer.close(() => { - this.legacyServers.delete(initialServer); - }); - - log.info({ path: this.socketPath }, "IPC socket re-bound after path loss"); - return true; + return this.watchdog.rebindIfMissing(); } // ── Internal ────────────────────────────────────────────────────────── - private ensureSocketDir(): void { - const socketDir = dirname(this.socketPath); - if (!existsSync(socketDir)) { - mkdirSync(socketDir, { recursive: true }); - } - } - private createListeningServer(): Server { const server = createServer((socket) => this.handleConnection(socket)); server.on("error", (err) => { diff --git a/packages/ipc-server-utils/bun.lock b/packages/ipc-server-utils/bun.lock new file mode 100644 index 00000000000..f63c70a280e --- /dev/null +++ b/packages/ipc-server-utils/bun.lock @@ -0,0 +1,24 @@ +{ + "lockfileVersion": 1, + "configVersion": 1, + "workspaces": { + "": { + "name": "@vellumai/ipc-server-utils", + "devDependencies": { + "@types/bun": "1.3.10", + "typescript": "5.9.3", + }, + }, + }, + "packages": { + "@types/bun": ["@types/bun@1.3.10", "", { "dependencies": { "bun-types": "1.3.10" } }, "sha512-0+rlrUrOrTSskibryHbvQkDOWRJwJZqZlxrUs1u4oOoTln8+WIXBPmAuCF35SWB2z4Zl3E84Nl/D0P7803nigQ=="], + + "@types/node": ["@types/node@25.6.0", "", { "dependencies": { "undici-types": "~7.19.0" } }, "sha512-+qIYRKdNYJwY3vRCZMdJbPLJAtGjQBudzZzdzwQYkEPQd+PJGixUL5QfvCLDaULoLv+RhT3LDkwEfKaAkgSmNQ=="], + + "bun-types": ["bun-types@1.3.10", "", { "dependencies": { "@types/node": "*" } }, "sha512-tcpfCCl6XWo6nCVnpcVrxQ+9AYN1iqMIzgrSKYMB/fjLtV2eyAVEg7AxQJuCq/26R6HpKWykQXuSOq/21RYcbg=="], + + "typescript": ["typescript@5.9.3", "", { "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" } }, "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw=="], + + "undici-types": ["undici-types@7.19.2", "", {}, "sha512-qYVnV5OEm2AW8cJMCpdV20CDyaN3g0AjDlOGf1OW4iaDEx8MwdtChUp4zu4H0VP3nDRF/8RKWH+IPp9uW0YGZg=="], + } +} diff --git a/packages/ipc-server-utils/package.json b/packages/ipc-server-utils/package.json new file mode 100644 index 00000000000..ff64631f1dc --- /dev/null +++ b/packages/ipc-server-utils/package.json @@ -0,0 +1,18 @@ +{ + "name": "@vellumai/ipc-server-utils", + "version": "0.0.1", + "private": true, + "license": "MIT", + "type": "module", + "exports": { + ".": "./src/index.ts" + }, + "scripts": { + "typecheck": "bunx tsc --noEmit", + "test": "bun test src/" + }, + "devDependencies": { + "@types/bun": "1.3.10", + "typescript": "5.9.3" + } +} diff --git a/packages/ipc-server-utils/src/index.ts b/packages/ipc-server-utils/src/index.ts new file mode 100644 index 00000000000..0e066b399a8 --- /dev/null +++ b/packages/ipc-server-utils/src/index.ts @@ -0,0 +1,6 @@ +export { + SocketWatchdog, + ensureSocketDir, + type SocketWatchdogOptions, + type SocketWatchdogLogger, +} from "./socket-watchdog.js"; diff --git a/packages/ipc-server-utils/src/socket-watchdog.test.ts b/packages/ipc-server-utils/src/socket-watchdog.test.ts new file mode 100644 index 00000000000..d1aee0f5928 --- /dev/null +++ b/packages/ipc-server-utils/src/socket-watchdog.test.ts @@ -0,0 +1,430 @@ +import { + afterAll, + afterEach, + beforeEach, + describe, + expect, + test, +} from "bun:test"; +import { + existsSync, + mkdtempSync, + rmSync, + unlinkSync, +} from "node:fs"; +import { createConnection, createServer, type Server, type Socket } from "node:net"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +import { SocketWatchdog, type SocketWatchdogLogger } from "./socket-watchdog.js"; + +// macOS caps Unix-socket paths at sizeof(sun_path)-1 == 103 bytes, so the +// shared test-preload temp dir is too long. Mint a short path under tmpdir +// for these tests. +const shortRoot = mkdtempSync(join(tmpdir(), "vmw-")); +const socketPath = join(shortRoot, "g.sock"); + +afterAll(() => { + try { + rmSync(shortRoot, { recursive: true, force: true }); + } catch { + // best-effort + } +}); + +interface TestHarness { + watchdog: SocketWatchdog; + /** Mutated by tests to simulate stop()/restart. */ + serverRef: { current: Server | null }; + /** Servers handed to onRebind, captured for assertions + cleanup. */ + rebinds: Array<{ newServer: Server; oldServer: Server }>; + log: SocketWatchdogLogger; + loggedErrors: Array<{ obj: object; msg?: string }>; + /** Tracks every server the harness factory produced, for cleanup. */ + spawnedServers: Server[]; +} + +interface BuildOptions { + intervalMs?: number; + createServerOverride?: () => Server; + /** Override `getServer` to simulate races. */ + getServerOverride?: () => Server | null; +} + +function buildHarness(opts: BuildOptions): TestHarness { + const serverRef: { current: Server | null } = { current: null }; + const rebinds: Array<{ newServer: Server; oldServer: Server }> = []; + const loggedErrors: Array<{ obj: object; msg?: string }> = []; + const spawnedServers: Server[] = []; + + const log: SocketWatchdogLogger = { + info: () => {}, + warn: () => {}, + error: (obj, msg) => { + loggedErrors.push({ obj, msg }); + }, + }; + + const defaultFactory = () => { + const s = createServer(); + s.on("error", () => { + /* tests don't care; suppress */ + }); + spawnedServers.push(s); + return s; + }; + + const watchdog = new SocketWatchdog({ + socketPath, + intervalMs: opts.intervalMs ?? 0, + getServer: opts.getServerOverride ?? (() => serverRef.current), + createServer: opts.createServerOverride ?? defaultFactory, + onRebind: (newServer, oldServer) => { + rebinds.push({ newServer, oldServer }); + serverRef.current = newServer; + // Mirror gateway behavior: close old server gracefully so its + // accept-loop drains. Close errors are not the watchdog's concern. + oldServer.close(() => { + /* drained */ + }); + }, + log, + }); + + return { watchdog, serverRef, rebinds, log, loggedErrors, spawnedServers }; +} + +/** + * Spin up a real listening server and install it into the harness. Returns + * once the kernel reports the socket file present on disk. + */ +async function startInitialServer(harness: TestHarness): Promise { + const server = createServer(); + server.on("error", () => { + /* ignore */ + }); + harness.spawnedServers.push(server); + await new Promise((resolve, reject) => { + server.once("error", reject); + server.once("listening", () => resolve()); + server.listen(socketPath); + }); + harness.serverRef.current = server; + return server; +} + +function connectClient(path: string): Promise { + return new Promise((resolve, reject) => { + const client: Socket = createConnection(path, () => resolve(client)); + client.on("error", reject); + }); +} + +async function closeServer(server: Server): Promise { + await new Promise((resolve) => { + server.close(() => resolve()); + }); +} + +describe("SocketWatchdog", () => { + let harness: TestHarness | undefined; + const sockets: Socket[] = []; + + beforeEach(() => { + harness = undefined; + // Defensive: clean up any leftover socket file from a previous test + // whose afterEach didn't fully drain. + if (existsSync(socketPath)) { + try { + unlinkSync(socketPath); + } catch { + /* ignore */ + } + } + }); + + afterEach(async () => { + for (const s of sockets) { + if (!s.destroyed) s.destroy(); + } + sockets.length = 0; + + if (harness) { + harness.watchdog.stop(); + // Close every server the harness produced, regardless of how the + // test left things. Closing an already-closed server is a no-op. + for (const s of harness.spawnedServers) { + try { + await closeServer(s); + } catch { + /* already closed */ + } + } + harness = undefined; + } + + if (existsSync(socketPath)) { + try { + unlinkSync(socketPath); + } catch { + /* ignore */ + } + } + }); + + test("rebindIfMissing is a no-op when the socket path exists", async () => { + harness = buildHarness({}); + await startInitialServer(harness); + + const rebound = await harness.watchdog.rebindIfMissing(); + expect(rebound).toBe(false); + expect(harness.rebinds).toHaveLength(0); + expect(existsSync(socketPath)).toBe(true); + }); + + test("rebindIfMissing is a no-op when getServer returns null", async () => { + harness = buildHarness({}); + // serverRef.current stays null. + const rebound = await harness.watchdog.rebindIfMissing(); + expect(rebound).toBe(false); + expect(harness.rebinds).toHaveLength(0); + }); + + test("rebindIfMissing recreates the listener when the path is gone", async () => { + harness = buildHarness({}); + const initial = await startInitialServer(harness); + expect(existsSync(socketPath)).toBe(true); + + // Simulate the cleanup that wipes /run/* — unlink the path while the + // listener fd is still alive in the kernel. + unlinkSync(socketPath); + expect(existsSync(socketPath)).toBe(false); + + const rebound = await harness.watchdog.rebindIfMissing(); + expect(rebound).toBe(true); + expect(existsSync(socketPath)).toBe(true); + expect(harness.rebinds).toHaveLength(1); + expect(harness.rebinds[0]!.oldServer).toBe(initial); + expect(harness.serverRef.current).toBe(harness.rebinds[0]!.newServer); + + // A fresh client can connect to the re-bound listener. + const client = await connectClient(socketPath); + sockets.push(client); + expect(client.destroyed).toBe(false); + }); + + test("connected clients survive a rebind", async () => { + harness = buildHarness({}); + await startInitialServer(harness); + + const survivor = await connectClient(socketPath); + sockets.push(survivor); + expect(survivor.destroyed).toBe(false); + + unlinkSync(socketPath); + const rebound = await harness.watchdog.rebindIfMissing(); + expect(rebound).toBe(true); + + // Give the close-callback a moment to settle without churning the EL. + await new Promise((r) => setTimeout(r, 10)); + expect(survivor.destroyed).toBe(false); + }); + + test("rebindIfMissing aborts when getServer changes mid-listen (shutdown race)", async () => { + // Drive the race deterministically by mutating what getServer returns + // between its first call (precondition check) and its second call + // (post-listen race guard). + const initial = createServer(); + initial.on("error", () => {}); + await new Promise((r) => { + initial.once("listening", () => r()); + initial.listen(socketPath); + }); + + let getServerCalls = 0; + const rebinds: Array<{ newServer: Server; oldServer: Server }> = []; + const spawnedNewServers: Server[] = []; + + const watchdog = new SocketWatchdog({ + socketPath, + intervalMs: 0, + getServer: () => { + getServerCalls++; + // First call: precondition — initialServer is still around. + // Subsequent calls (race guard): null, simulating stop(). + return getServerCalls === 1 ? initial : null; + }, + createServer: () => { + const s = createServer(); + s.on("error", () => {}); + spawnedNewServers.push(s); + return s; + }, + onRebind: (n, o) => { + rebinds.push({ newServer: n, oldServer: o }); + }, + log: { info: () => {}, warn: () => {}, error: () => {} }, + }); + + unlinkSync(socketPath); + expect(existsSync(socketPath)).toBe(false); + + const rebound = await watchdog.rebindIfMissing(); + expect(rebound).toBe(false); + expect(rebinds).toHaveLength(0); + // The race guard should have unlinked the path the discarded server + // recreated, so a future start() doesn't see a phantom listener. + expect(existsSync(socketPath)).toBe(false); + // getServer was called at least twice — once for precondition, once + // for the race guard. + expect(getServerCalls).toBeGreaterThanOrEqual(2); + + // Cleanup: initial is still listening on the unlinked path; close it. + await closeServer(initial); + for (const s of spawnedNewServers) { + try { + await closeServer(s); + } catch { + /* already closed by race guard */ + } + } + }); + + test("rebindIfMissing returns false and logs when listen() rejects", async () => { + // Provide a factory whose listen() always errors, so rebindIfMissing + // hits the catch branch. + const initial = createServer(); + initial.on("error", () => {}); + await new Promise((r) => { + initial.once("listening", () => r()); + initial.listen(socketPath); + }); + + const rebinds: Array<{ newServer: Server; oldServer: Server }> = []; + const loggedErrors: Array<{ obj: object; msg?: string }> = []; + const failingFactory = () => { + const s = createServer(); + s.on("error", () => {}); + // Replace listen to immediately error. + const realListen = s.listen.bind(s); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (s as any).listen = (_path: string) => { + queueMicrotask(() => s.emit("error", new Error("simulated EADDRINUSE"))); + return s; + }; + // Keep realListen reference alive so TS doesn't complain + void realListen; + return s; + }; + + const watchdog = new SocketWatchdog({ + socketPath, + intervalMs: 0, + getServer: () => initial, + createServer: failingFactory, + onRebind: (n, o) => rebinds.push({ newServer: n, oldServer: o }), + log: { + info: () => {}, + warn: () => {}, + error: (obj, msg) => loggedErrors.push({ obj, msg }), + }, + }); + + unlinkSync(socketPath); + const rebound = await watchdog.rebindIfMissing(); + expect(rebound).toBe(false); + expect(rebinds).toHaveLength(0); + expect(loggedErrors.length).toBeGreaterThan(0); + + await closeServer(initial); + }); + + test("watchdog timer catches synchronous rebind errors so unhandled rejections don't escape", async () => { + // createServer factory throws synchronously — simulates EACCES on + // mkdir / a broken factory dependency. + const throwingFactory = () => { + throw new Error("boom — synchronous factory failure"); + }; + + const initial = createServer(); + initial.on("error", () => {}); + await new Promise((r) => { + initial.once("listening", () => r()); + initial.listen(socketPath); + }); + + const loggedErrors: Array<{ obj: object; msg?: string }> = []; + const watchdog = new SocketWatchdog({ + socketPath, + intervalMs: 5, + getServer: () => initial, + createServer: throwingFactory, + onRebind: () => {}, + log: { + info: () => {}, + warn: () => {}, + error: (obj, msg) => loggedErrors.push({ obj, msg }), + }, + }); + + unlinkSync(socketPath); + + const seenRejections: unknown[] = []; + const onRejection = (reason: unknown) => seenRejections.push(reason); + process.on("unhandledRejection", onRejection); + + try { + watchdog.start(); + // Let the timer fire several times. + await new Promise((r) => setTimeout(r, 30)); + watchdog.stop(); + } finally { + process.off("unhandledRejection", onRejection); + } + + expect(seenRejections).toHaveLength(0); + expect(loggedErrors.length).toBeGreaterThan(0); + + await closeServer(initial); + }); + + test("start() polls and rebinds without manual ticking", async () => { + harness = buildHarness({ intervalMs: 10 }); + await startInitialServer(harness); + harness.watchdog.start(); + + unlinkSync(socketPath); + + // Wait up to 500ms for the timer to recover. + const deadline = Date.now() + 500; + while (harness.rebinds.length === 0 && Date.now() < deadline) { + await new Promise((r) => setTimeout(r, 5)); + } + + expect(harness.rebinds).toHaveLength(1); + expect(existsSync(socketPath)).toBe(true); + }); + + test("stop() prevents future rebinds from firing", async () => { + harness = buildHarness({ intervalMs: 10 }); + await startInitialServer(harness); + harness.watchdog.start(); + + // First recovery cycle. + unlinkSync(socketPath); + let deadline = Date.now() + 500; + while (harness.rebinds.length < 1 && Date.now() < deadline) { + await new Promise((r) => setTimeout(r, 5)); + } + expect(harness.rebinds).toHaveLength(1); + + harness.watchdog.stop(); + const stoppedAt = harness.rebinds.length; + + // Unlink again. Wait three intervals; no new rebind should appear. + unlinkSync(socketPath); + await new Promise((r) => setTimeout(r, 50)); + expect(harness.rebinds).toHaveLength(stoppedAt); + expect(existsSync(socketPath)).toBe(false); + }); +}); diff --git a/packages/ipc-server-utils/src/socket-watchdog.ts b/packages/ipc-server-utils/src/socket-watchdog.ts new file mode 100644 index 00000000000..270c3ce16c8 --- /dev/null +++ b/packages/ipc-server-utils/src/socket-watchdog.ts @@ -0,0 +1,221 @@ +/** + * Resilience helper for Unix-domain-socket IPC servers: re-binds the + * listening socket when its on-disk path entry has been removed (e.g. by a + * tmpfs sweep or rogue cleanup of `/run/*`). + * + * Existing connected sockets survive the re-bind because the kernel keeps + * connection inodes alive independently of the listener path; only new + * `connect()` calls require the path to exist. + * + * Consumers wire their `Server` reference into the watchdog via callbacks + * rather than passing the server directly so the watchdog can guard against + * shutdown/restart races mid-rebind. + */ + +import { existsSync, mkdirSync, unlinkSync } from "node:fs"; +import type { Server } from "node:net"; +import { dirname } from "node:path"; + +/** + * Minimal logger surface (pino-compatible). Each method receives a context + * object plus an optional human-readable message. + */ +export interface SocketWatchdogLogger { + info(obj: object, msg?: string): void; + warn(obj: object, msg?: string): void; + error(obj: object, msg?: string): void; +} + +export interface SocketWatchdogOptions { + /** Absolute path to the Unix socket file the consumer is listening on. */ + socketPath: string; + /** + * How often to stat the socket path. Set to `0` to disable. Defaults to + * 5000ms. + */ + intervalMs?: number; + /** + * Returns the consumer's current listening server. The watchdog uses this + * both as a precondition (no rebind when null) and as a generation marker + * to detect shutdown/restart races mid-rebind. + */ + getServer: () => Server | null; + /** + * Factory for a fresh listening Server. Called by the watchdog when a + * rebind is needed; the watchdog drives `.listen(socketPath)` and waits + * for the `listening` event before installing. + */ + createServer: () => Server; + /** + * Invoked when a rebind succeeds. The consumer is responsible for + * swapping its primary server reference to `newServer` and disposing of + * `oldServer` (typically by tracking it as a legacy listener while + * in-flight clients drain, then closing it). + */ + onRebind: (newServer: Server, oldServer: Server) => void; + /** Pino-compatible logger. */ + log: SocketWatchdogLogger; +} + +const DEFAULT_INTERVAL_MS = 5000; + +/** + * Ensure the directory containing `socketPath` exists. Created with mode + * `0o700` so a freshly-spawned dir on a tmpfs mount doesn't leak the IPC + * surface to other UIDs. Existing directories keep their permissions — + * `mkdir` only applies the mode to directories it creates. + */ +export function ensureSocketDir(socketPath: string): void { + const socketDir = dirname(socketPath); + if (!existsSync(socketDir)) { + mkdirSync(socketDir, { recursive: true, mode: 0o700 }); + } +} + +/** + * Watchdog that periodically stats a Unix socket file and re-binds the + * listener when the path has been removed. + * + * Lifecycle: + * - Construct with the consumer's callbacks. + * - Call {@link start} after the consumer's initial `listen()` succeeds. + * - Call {@link stop} during shutdown (before closing the underlying + * server) so an in-flight rebind doesn't resurrect the listener. + * + * The watchdog timer is `unref`-ed so it never keeps the event loop alive + * on its own. + */ +export class SocketWatchdog { + private readonly socketPath: string; + private readonly intervalMs: number; + private readonly getServer: () => Server | null; + private readonly createServer: () => Server; + private readonly onRebind: (newServer: Server, oldServer: Server) => void; + private readonly log: SocketWatchdogLogger; + + private handle: ReturnType | null = null; + + constructor(options: SocketWatchdogOptions) { + this.socketPath = options.socketPath; + this.intervalMs = options.intervalMs ?? DEFAULT_INTERVAL_MS; + this.getServer = options.getServer; + this.createServer = options.createServer; + this.onRebind = options.onRebind; + this.log = options.log; + } + + /** + * Begin polling the socket path. No-op if `intervalMs <= 0` or the + * watchdog is already running. + */ + start(): void { + if (this.intervalMs <= 0 || this.handle !== null) return; + this.handle = setInterval(() => { + // The async entry path of rebindIfMissing performs filesystem work + // (ensureSocketDir, createServer) before its inner try/catch, so a + // synchronous throw — e.g. EACCES on a read-only fs — would surface + // as an unhandled rejection on every tick. Catch here so the timer + // stays quiet on persistent failure modes. + this.rebindIfMissing().catch((err) => { + this.log.error( + { err, path: this.socketPath }, + "Watchdog rebind failed unexpectedly", + ); + }); + }, this.intervalMs); + this.handle.unref?.(); + } + + /** Stop the polling timer. Safe to call multiple times. */ + stop(): void { + if (this.handle !== null) { + clearInterval(this.handle); + this.handle = null; + } + } + + /** + * Re-bind the listening socket if its path entry is missing on disk. + * + * Public for tests so the watchdog can be exercised deterministically + * without waiting for the interval. Returns `true` when a re-bind was + * performed, `false` when the socket was already healthy, the consumer + * is not running, or a shutdown/restart raced the rebind. + */ + async rebindIfMissing(): Promise { + const initialServer = this.getServer(); + if (initialServer === null) return false; + if (existsSync(this.socketPath)) return false; + + this.log.warn( + { path: this.socketPath }, + "IPC socket path missing on disk — re-binding listener", + ); + + ensureSocketDir(this.socketPath); + + const newServer = this.createServer(); + try { + await new Promise((resolve, reject) => { + const onError = (err: unknown) => { + newServer.off("listening", onListening); + reject(err); + }; + const onListening = () => { + newServer.off("error", onError); + resolve(); + }; + newServer.once("error", onError); + newServer.once("listening", onListening); + newServer.listen(this.socketPath); + }); + } catch (err) { + this.log.error( + { err, path: this.socketPath }, + "Failed to re-bind IPC socket — will retry on next watchdog tick", + ); + try { + newServer.close(); + } catch { + /* ignore */ + } + return false; + } + + // Race guard: while we were awaiting listen(), the consumer may have + // stopped, restarted, or otherwise replaced its server reference. + // Installing newServer would resurrect a listener after shutdown + // (keeping the process alive and accepting IPC again). Discard the + // new server instead. + if (this.getServer() !== initialServer) { + try { + newServer.close(); + } catch { + /* ignore */ + } + // newServer.listen() recreated the path on disk. If our listen won + // the race, the file is sitting there — clean it up so it doesn't + // shadow a future start(). + if (existsSync(this.socketPath)) { + try { + unlinkSync(this.socketPath); + } catch { + /* ignore */ + } + } + this.log.warn( + { path: this.socketPath }, + "IPC server state changed during rebind — discarded new listener", + ); + return false; + } + + this.onRebind(newServer, initialServer); + + this.log.info( + { path: this.socketPath }, + "IPC socket re-bound after path loss", + ); + return true; + } +} diff --git a/packages/ipc-server-utils/tsconfig.json b/packages/ipc-server-utils/tsconfig.json new file mode 100644 index 00000000000..96a0aed5aff --- /dev/null +++ b/packages/ipc-server-utils/tsconfig.json @@ -0,0 +1,20 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "NodeNext", + "moduleResolution": "NodeNext", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "resolveJsonModule": true, + "declaration": true, + "declarationMap": true, + "sourceMap": true, + "outDir": "./dist", + "rootDir": "./src", + "types": ["bun-types"] + }, + "include": ["src/**/*"], + "exclude": ["node_modules", "dist"] +}