Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions assistant/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ COPY packages/service-contracts ./packages/service-contracts
COPY packages/credential-storage ./packages/credential-storage
COPY packages/egress-proxy ./packages/egress-proxy
COPY packages/gateway-client ./packages/gateway-client
COPY packages/ipc-server-utils ./packages/ipc-server-utils
COPY packages/skill-host-contracts ./packages/skill-host-contracts
COPY packages/slack-text ./packages/slack-text
COPY packages/twilio-client ./packages/twilio-client
Expand Down
3 changes: 3 additions & 0 deletions assistant/bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions assistant/knip.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"@vellumai/credential-storage",
"@vellumai/egress-proxy",
"@vellumai/gateway-client",
"@vellumai/ipc-server-utils",
"@vellumai/service-contracts",
"@vellumai/slack-text",
"@vellumai/twilio-client",
Expand Down
2 changes: 2 additions & 0 deletions assistant/package.json
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 @vellumai/ipc-server-utils missing from bundledDependencies breaks npm pack

The new @vellumai/ipc-server-utils dependency uses a file: specifier (file:../packages/ipc-server-utils) in assistant/package.json:47 but is not listed in the bundledDependencies array. Every other @vellumai/* file-specifier dependency is in bundledDependencies. The prepack script (scripts/prepack-bundled-deps.mjs:25-34) explicitly validates this invariant — it iterates all file: dependencies, checks membership in bundledDependencies, and calls process.exit(1) if any are missing. This will cause the assistant's prepack step and consequently npm pack / npm publish to fail.

(Refers to lines 76-85)

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"@vellumai/credential-storage": "file:../packages/credential-storage",
"@vellumai/egress-proxy": "file:../packages/egress-proxy",
"@vellumai/gateway-client": "file:../packages/gateway-client",
"@vellumai/ipc-server-utils": "file:../packages/ipc-server-utils",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Copy new local package into assistant Docker build context

Adding @vellumai/ipc-server-utils as a file:../packages/... dependency here requires /app/packages/ipc-server-utils to exist before bun install --frozen-lockfile runs in the assistant image build. assistant/Dockerfile currently copies only ces-client, service-contracts, credential-storage, egress-proxy, gateway-client, skill-host-contracts, slack-text, and twilio-client (lines 20–27), so Docker builds that install assistant dependencies will fail to resolve this local package.

Useful? React with 👍 / 👎.

"@vellumai/service-contracts": "file:../packages/service-contracts",
"@vellumai/skill-host-contracts": "file:../packages/skill-host-contracts",
"@vellumai/slack-text": "file:../packages/slack-text",
Expand Down Expand Up @@ -78,6 +79,7 @@
"@vellumai/service-contracts",
"@vellumai/egress-proxy",
"@vellumai/gateway-client",
"@vellumai/ipc-server-utils",
"@vellumai/skill-host-contracts",
"@vellumai/slack-text",
"@vellumai/twilio-client"
Expand Down
129 changes: 93 additions & 36 deletions assistant/src/ipc/assistant-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@
* back to a shorter deterministic path so CLI commands can still connect.
*/

import { existsSync, mkdirSync, unlinkSync } from "node:fs";
import { existsSync, unlinkSync } from "node:fs";
import { createServer, type Server, type Socket } from "node:net";
import { dirname } from "node:path";

import {
ensureSocketDir,
SocketWatchdog,
} from "@vellumai/ipc-server-utils";

import { findLocalGuardianPrincipalId } from "../runtime/local-actor-identity.js";
import { RouteError } from "../runtime/routes/errors.js";
Expand Down Expand Up @@ -130,13 +134,29 @@ function isIpcBinaryResponse(value: unknown): value is IpcBinaryResponse {
// Server
// ---------------------------------------------------------------------------

/** Optional configuration for {@link AssistantIpcServer}. */
export interface AssistantIpcServerOptions {
/**
* How often the socket-file watchdog stats the listening socket path.
* Set to `0` to disable. Defaults to {@link SocketWatchdog}'s 5000ms.
*/
watchdogIntervalMs?: number;
}

export class AssistantIpcServer {
private server: Server | null = null;
private clients = new Set<Socket>();
private methods = new Map<string, RouteDefinition["handler"]>();
private socketPath: string;
private watchdog: SocketWatchdog;
/**
* Servers whose listener path has been replaced by a re-bind. Kept around
* so already-connected sockets continue to work; closed gracefully once
* their accept loops drain.
*/
private legacyServers = new Set<Server>();

constructor() {
constructor(options?: AssistantIpcServerOptions) {
const resolution = resolveIpcSocketPath("assistant");
this.socketPath = resolution.path;
log.info(
Expand All @@ -154,62 +174,55 @@ export class AssistantIpcServer {
this.methods.set("db_proxy", (params) =>
handleDbProxy(params as unknown as DbProxyParams),
);

this.watchdog = new SocketWatchdog({
socketPath: this.socketPath,
intervalMs: options?.watchdogIntervalMs,
getServer: () => this.server,
createServer: () => this.createListeningServer(),
onRebind: (newServer, oldServer) => {
this.server = newServer;
this.legacyServers.add(oldServer);
oldServer.close(() => {
this.legacyServers.delete(oldServer);
});
},
log,
});
}

/** Start listening on the Unix domain socket. */
async start(): Promise<void> {
// Ensure the parent directory exists before listening.
const socketDir = dirname(this.socketPath);
if (!existsSync(socketDir)) {
mkdirSync(socketDir, { recursive: true, mode: 0o700 });
}
ensureSocketDir(this.socketPath);

// Probe before unlink so a second daemon can't silently orphan an active
// listener (Unix lets you unlink a still-bound socket file). See
// `ensureSocketPathFree` for the behavior matrix.
await ensureSocketPathFree(this.socketPath);

this.server = createServer((socket) => {
this.clients.add(socket);
log.debug("IPC client connected");

const reader = new IpcFrameReader(
(envelope, binary) =>
this.handleEnvelope(socket, reader, envelope, binary),
(err) => log.warn({ err }, "IPC frame read error"),
);

socket.on("data", (chunk) => {
reader.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
});

socket.on("close", () => {
this.clients.delete(socket);
log.debug("IPC client disconnected");
});

socket.on("error", (err) => {
log.warn({ err }, "IPC client socket error");
this.clients.delete(socket);
});
});

this.server.on("error", (err) => {
log.error({ err }, "Assistant IPC server error");
});

this.server = this.createListeningServer();
this.server.listen(this.socketPath, () => {
log.info({ path: this.socketPath }, "Assistant IPC server listening");
});

this.watchdog.start();
}

/** Stop the server and disconnect all clients. */
stop(): void {
this.watchdog.stop();

for (const client of this.clients) {
if (!client.destroyed) client.destroy();
}
this.clients.clear();

for (const legacy of this.legacyServers) {
legacy.close();
}
this.legacyServers.clear();

if (this.server) {
this.server.close();
this.server = null;
Expand All @@ -229,8 +242,52 @@ export class AssistantIpcServer {
return this.socketPath;
}

/**
* Re-bind the listening socket if its path entry is missing on disk.
*
* Public for tests so the watchdog can be exercised deterministically
* without waiting for the interval. Returns `true` when a re-bind was
* performed, `false` otherwise.
*/
async rebindIfMissing(): Promise<boolean> {
return this.watchdog.rebindIfMissing();
}

// ── Internal ──────────────────────────────────────────────────────────

private createListeningServer(): Server {
const server = createServer((socket) => {
this.clients.add(socket);
log.debug("IPC client connected");

const reader = new IpcFrameReader(
(envelope, binary) =>
this.handleEnvelope(socket, reader, envelope, binary),
(err) => log.warn({ err }, "IPC frame read error"),
);

socket.on("data", (chunk) => {
reader.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
});

socket.on("close", () => {
this.clients.delete(socket);
log.debug("IPC client disconnected");
});

socket.on("error", (err) => {
log.warn({ err }, "IPC client socket error");
this.clients.delete(socket);
});
});

server.on("error", (err) => {
log.error({ err }, "Assistant IPC server error");
});

return server;
}

private handleEnvelope(
socket: Socket,
reader: IpcFrameReader,
Expand Down
Loading
Loading