diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index f3d6ef47f40..a2e50ddac99 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -189,6 +189,7 @@ In Docker mode (`IS_CONTAINERIZED=true`), services that need data from another s - **Trust rules**: The assistant reads/writes trust rules via the gateway's HTTP trust API. The gateway owns the filesystem copy at `/gateway-security/trust.json`. - **Credentials**: The assistant and gateway access credential CRUD via the CES HTTP API (`CES_CREDENTIAL_URL`), authenticated with `CES_SERVICE_TOKEN`. The CES owns the encryption keys at `/ces-security/`. +- **Contacts (auth/authz)**: The gateway owns `contacts` and `contact_channels` tables in its SQLite database (`/gateway-security/gateway.sqlite`). These tables store contact authentication and authorization data — who can talk to the assistant and what their channel policies are. The assistant daemon reads contact auth/authz data via IPC (`get_contact`, `list_contacts`, `get_contact_by_channel`, `get_channels_for_contact`). The assistant retains ownership of contact **context** (conversation history, memory associations, display preferences) in its own database. This separation is in progress — the gateway tables are declared and IPC handlers are wired, but endpoint cutover and data migration are not yet complete. ### Signing Key Bootstrap Protocol @@ -302,7 +303,7 @@ subgraph "Text Q&A Session" DB_TASKS["tasks"] DB_TASK_RUNS["task_runs"] DB_WORK_ITEMS["work_items"] - DB_CONTACTS["contacts"] + DB_CONTACTS["contacts
(migrating to gateway)"] end subgraph "Tracing" diff --git a/gateway/ARCHITECTURE.md b/gateway/ARCHITECTURE.md index 8fbf6b78e5a..b92b45f33eb 100644 --- a/gateway/ARCHITECTURE.md +++ b/gateway/ARCHITECTURE.md @@ -601,12 +601,23 @@ If no guardian binding exists for the channel, escalation fails closed -- the me #### SQLite Tables +**Assistant DB** (`assistant.db` — current owner, migrating to gateway): + | Table | Purpose | | --------------------------- | --------------------------------------------------------------------- | | `assistant_ingress_invites` | Invite tokens with SHA-256 hashes, expiry, use counts | | `contacts` | Contact records with role, relationship, and per-contact metadata | | `contact_channels` | Channel bindings per contact with access policy (allow/deny/escalate) | +**Gateway DB** (`gateway.sqlite` — future owner of auth/authz): + +| Table | Purpose | +| ------------------ | ---------------------------------------------------------------------- | +| `contacts` | Contact auth/authz: id, display_name, role, principal_id | +| `contact_channels` | Channel bindings with policy, status, external IDs, verification state | + +The gateway declares `contacts` and `contact_channels` tables and exposes them via IPC (`list_contacts`, `get_contact`, `get_contact_by_channel`, `get_channels_for_contact`). Endpoint cutover and data migration are in progress — the gateway will become the canonical owner once dual-writing is enabled. + #### Key Modules | Module | Purpose | @@ -616,6 +627,8 @@ If no guardian binding exists for the channel, escalation fails closed -- the me | `assistant/src/contacts/contacts-write.ts` | Contact and channel writes (upsert, policy changes, invite redemption) | | `assistant/src/daemon/handlers/config-inbox.ts` | Handlers for invite and member contracts | | `assistant/src/runtime/routes/channel-routes.ts` | ACL enforcement point -- member lookup, policy check, escalation creation | +| `gateway/src/db/contact-store.ts` | Gateway-side read-only ContactStore (prepared-statement queries) | +| `gateway/src/ipc/contact-handlers.ts` | IPC route handlers for contact reads | ### Telegram Credential Flow diff --git a/gateway/src/__tests__/ipc-contact-routes.test.ts b/gateway/src/__tests__/ipc-contact-routes.test.ts new file mode 100644 index 00000000000..a5ecd70a4a2 --- /dev/null +++ b/gateway/src/__tests__/ipc-contact-routes.test.ts @@ -0,0 +1,302 @@ +import { describe, test, expect, beforeEach, afterEach } from "bun:test"; +import { mkdirSync, rmSync, existsSync } from "node:fs"; +import { join } from "node:path"; +import { tmpdir } from "node:os"; +import { randomBytes } from "node:crypto"; +import { createConnection, type Socket } from "node:net"; +import { Database } from "bun:sqlite"; +import { GatewayIpcServer } from "../ipc/server.js"; +import { contactRoutes } from "../ipc/contact-handlers.js"; +import { ContactStore } from "../db/contact-store.js"; +import { getGatewayDb } from "../db/connection.js"; + +const testDir = join( + tmpdir(), + `vellum-ipc-contact-test-${randomBytes(6).toString("hex")}`, +); +const protectedDir = join(testDir, ".vellum", "protected"); +const socketPath = join(testDir, "gateway.sock"); + +const savedWorkspaceDir = process.env.VELLUM_WORKSPACE_DIR; +const savedGatewaySecurityDir = process.env.GATEWAY_SECURITY_DIR; + +beforeEach(() => { + process.env.VELLUM_WORKSPACE_DIR = testDir; + process.env.GATEWAY_SECURITY_DIR = protectedDir; + mkdirSync(protectedDir, { recursive: true }); +}); + +afterEach(() => { + if (savedWorkspaceDir === undefined) { + delete process.env.VELLUM_WORKSPACE_DIR; + } else { + process.env.VELLUM_WORKSPACE_DIR = savedWorkspaceDir; + } + if (savedGatewaySecurityDir === undefined) { + delete process.env.GATEWAY_SECURITY_DIR; + } else { + process.env.GATEWAY_SECURITY_DIR = savedGatewaySecurityDir; + } + try { + rmSync(testDir, { recursive: true, force: true }); + } catch { + // best effort cleanup + } +}); + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function connectClient(path: string): Promise { + return new Promise((resolve, reject) => { + const client = createConnection(path, () => resolve(client)); + client.on("error", reject); + }); +} + +function sendRequest( + client: Socket, + method: string, + params?: Record, +): Promise<{ id: string; result?: unknown; error?: string }> { + return new Promise((resolve, reject) => { + const id = randomBytes(4).toString("hex"); + let buffer = ""; + + const onData = (chunk: Buffer) => { + buffer += chunk.toString(); + const newlineIdx = buffer.indexOf("\n"); + if (newlineIdx !== -1) { + const line = buffer.slice(0, newlineIdx).trim(); + buffer = buffer.slice(newlineIdx + 1); + client.off("data", onData); + try { + resolve(JSON.parse(line)); + } catch (err) { + reject(err); + } + } + }; + + client.on("data", onData); + const msg = JSON.stringify({ id, method, params }); + client.write(msg + "\n"); + }); +} + +function seedTestData(db: Database): void { + const now = Date.now(); + + db.exec("DELETE FROM contact_channels"); + db.exec("DELETE FROM contacts"); + + db.exec( + `INSERT INTO contacts (id, display_name, role, principal_id, created_at, updated_at) + VALUES ('c1', 'Test Guardian', 'guardian', 'p1', ${now}, ${now})`, + ); + + db.exec( + `INSERT INTO contacts (id, display_name, role, principal_id, created_at, updated_at) + VALUES ('c2', 'Test Contact', 'contact', NULL, ${now}, ${now})`, + ); + + db.exec( + `INSERT INTO contact_channels (id, contact_id, type, address, is_primary, external_user_id, external_chat_id, status, policy, interaction_count, created_at) + VALUES ('ch1', 'c1', 'telegram', 'test-tg-user', 1, 'tg-fake-001', 'chat-fake-001', 'active', 'allow', 5, ${now})`, + ); + + db.exec( + `INSERT INTO contact_channels (id, contact_id, type, address, is_primary, external_user_id, external_chat_id, status, policy, interaction_count, created_at) + VALUES ('ch2', 'c1', 'slack', 'test-slack-user', 0, 'UFAKE00001', 'DFAKE00001', 'active', 'allow', 10, ${now})`, + ); + + db.exec( + `INSERT INTO contact_channels (id, contact_id, type, address, is_primary, external_user_id, external_chat_id, status, policy, interaction_count, created_at) + VALUES ('ch3', 'c2', 'email', 'test@example.com', 1, NULL, NULL, 'unverified', 'escalate', 0, ${now})`, + ); +} + +// --------------------------------------------------------------------------- +// ContactStore unit tests +// --------------------------------------------------------------------------- + +describe("ContactStore", () => { + test("listContacts returns all contacts", () => { + const db = getGatewayDb(); + seedTestData(db); + const store = new ContactStore(db); + + const contacts = store.listContacts(); + expect(contacts).toHaveLength(2); + expect(contacts.map((c) => c.id).sort()).toEqual(["c1", "c2"]); + }); + + test("getContact returns a single contact", () => { + const db = getGatewayDb(); + seedTestData(db); + const store = new ContactStore(db); + + const contact = store.getContact("c1"); + expect(contact).not.toBeNull(); + expect(contact!.displayName).toBe("Test Guardian"); + expect(contact!.role).toBe("guardian"); + }); + + test("getContact returns null for unknown id", () => { + const db = getGatewayDb(); + seedTestData(db); + const store = new ContactStore(db); + + expect(store.getContact("nonexistent")).toBeNull(); + }); + + test("getContactByChannel finds contact by channel type and external user id", () => { + const db = getGatewayDb(); + seedTestData(db); + const store = new ContactStore(db); + + const contact = store.getContactByChannel("telegram", "tg-fake-001"); + expect(contact).not.toBeNull(); + expect(contact!.id).toBe("c1"); + }); + + test("getContactByChannel returns null for unknown channel", () => { + const db = getGatewayDb(); + seedTestData(db); + const store = new ContactStore(db); + + expect(store.getContactByChannel("telegram", "nonexistent")).toBeNull(); + }); + + test("getChannelsForContact returns all channels for a contact", () => { + const db = getGatewayDb(); + seedTestData(db); + const store = new ContactStore(db); + + const channels = store.getChannelsForContact("c1"); + expect(channels).toHaveLength(2); + expect(channels.map((ch) => ch.type).sort()).toEqual(["slack", "telegram"]); + }); + + test("getChannelsForContact returns empty array for unknown contact", () => { + const db = getGatewayDb(); + seedTestData(db); + const store = new ContactStore(db); + + expect(store.getChannelsForContact("nonexistent")).toHaveLength(0); + }); + + test("contact_channels cascade deletes when contact is deleted", () => { + const db = getGatewayDb(); + seedTestData(db); + const store = new ContactStore(db); + + expect(store.getChannelsForContact("c1")).toHaveLength(2); + db.exec("DELETE FROM contacts WHERE id = 'c1'"); + expect(store.getChannelsForContact("c1")).toHaveLength(0); + }); +}); + +// --------------------------------------------------------------------------- +// IPC route tests +// --------------------------------------------------------------------------- + +describe("IPC contact routes", () => { + let server: InstanceType; + let client: Socket; + + beforeEach(async () => { + if (existsSync(socketPath)) { + rmSync(socketPath); + } + }); + + afterEach(() => { + client?.destroy(); + server?.stop(); + }); + + async function startServerAndConnect(): Promise { + server = new GatewayIpcServer([...contactRoutes]); + (server as unknown as { socketPath: string }).socketPath = socketPath; + server.start(); + await new Promise((resolve) => setTimeout(resolve, 50)); + client = await connectClient(socketPath); + } + + test("list_contacts returns seeded contacts via IPC", async () => { + const db = getGatewayDb(); + seedTestData(db); + + await startServerAndConnect(); + const res = await sendRequest(client, "list_contacts"); + + expect(res.error).toBeUndefined(); + const contacts = res.result as { id: string; displayName: string }[]; + expect(contacts).toHaveLength(2); + }); + + test("get_contact returns a specific contact via IPC", async () => { + const db = getGatewayDb(); + seedTestData(db); + + await startServerAndConnect(); + const res = await sendRequest(client, "get_contact", { contactId: "c1" }); + + expect(res.error).toBeUndefined(); + const contact = res.result as { id: string; displayName: string }; + expect(contact.id).toBe("c1"); + expect(contact.displayName).toBe("Test Guardian"); + }); + + test("get_contact returns null for unknown contact", async () => { + const db = getGatewayDb(); + seedTestData(db); + + await startServerAndConnect(); + const res = await sendRequest(client, "get_contact", { + contactId: "nonexistent", + }); + + expect(res.error).toBeUndefined(); + expect(res.result).toBeNull(); + }); + + test("get_contact_by_channel resolves contact from channel info", async () => { + const db = getGatewayDb(); + seedTestData(db); + + await startServerAndConnect(); + const res = await sendRequest(client, "get_contact_by_channel", { + channelType: "slack", + externalUserId: "UFAKE00001", + }); + + expect(res.error).toBeUndefined(); + const contact = res.result as { id: string }; + expect(contact.id).toBe("c1"); + }); + + test("get_channels_for_contact returns channel list", async () => { + const db = getGatewayDb(); + seedTestData(db); + + await startServerAndConnect(); + const res = await sendRequest(client, "get_channels_for_contact", { + contactId: "c1", + }); + + expect(res.error).toBeUndefined(); + const channels = res.result as { id: string; type: string }[]; + expect(channels).toHaveLength(2); + }); + + test("get_contact validates params", async () => { + await startServerAndConnect(); + const res = await sendRequest(client, "get_contact", {}); + + expect(res.error).toBeDefined(); + expect(res.error).toContain("Invalid params"); + }); +}); diff --git a/gateway/src/db/connection.ts b/gateway/src/db/connection.ts index b873a250d38..40c6d77434b 100644 --- a/gateway/src/db/connection.ts +++ b/gateway/src/db/connection.ts @@ -87,4 +87,49 @@ function migrate(db: Database): void { ran_at INTEGER NOT NULL ) `); + + db.exec(` + CREATE TABLE IF NOT EXISTS contacts ( + id TEXT PRIMARY KEY, + display_name TEXT NOT NULL, + role TEXT NOT NULL DEFAULT 'contact', + principal_id TEXT, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL + ) + `); + + db.exec(` + CREATE TABLE IF NOT EXISTS contact_channels ( + id TEXT PRIMARY KEY, + contact_id TEXT NOT NULL REFERENCES contacts(id) ON DELETE CASCADE, + type TEXT NOT NULL, + address TEXT NOT NULL, + is_primary INTEGER NOT NULL DEFAULT 0, + external_user_id TEXT, + external_chat_id TEXT, + status TEXT NOT NULL DEFAULT 'unverified', + policy TEXT NOT NULL DEFAULT 'allow', + verified_at INTEGER, + verified_via TEXT, + invite_id TEXT, + revoked_reason TEXT, + blocked_reason TEXT, + last_seen_at INTEGER, + interaction_count INTEGER NOT NULL DEFAULT 0, + last_interaction INTEGER, + created_at INTEGER NOT NULL, + updated_at INTEGER + ) + `); + + db.exec(` + CREATE INDEX IF NOT EXISTS idx_contact_channels_type_ext_user + ON contact_channels(type, external_user_id) + `); + + db.exec(` + CREATE INDEX IF NOT EXISTS idx_contact_channels_type_ext_chat + ON contact_channels(type, external_chat_id) + `); } diff --git a/gateway/src/db/contact-store.ts b/gateway/src/db/contact-store.ts new file mode 100644 index 00000000000..404208ab837 --- /dev/null +++ b/gateway/src/db/contact-store.ts @@ -0,0 +1,156 @@ +import type { Database, Statement } from "bun:sqlite"; +import { getGatewayDb } from "./connection.js"; + +export type Contact = { + id: string; + displayName: string; + role: string; + principalId: string | null; + createdAt: number; + updatedAt: number; +}; + +export type ContactChannel = { + id: string; + contactId: string; + type: string; + address: string; + isPrimary: boolean; + externalUserId: string | null; + externalChatId: string | null; + status: string; + policy: string; + verifiedAt: number | null; + verifiedVia: string | null; + inviteId: string | null; + revokedReason: string | null; + blockedReason: string | null; + lastSeenAt: number | null; + interactionCount: number; + lastInteraction: number | null; + createdAt: number; + updatedAt: number | null; +}; + +type ContactRow = { + id: string; + display_name: string; + role: string; + principal_id: string | null; + created_at: number; + updated_at: number; +}; + +type ContactChannelRow = { + id: string; + contact_id: string; + type: string; + address: string; + is_primary: number; + external_user_id: string | null; + external_chat_id: string | null; + status: string; + policy: string; + verified_at: number | null; + verified_via: string | null; + invite_id: string | null; + revoked_reason: string | null; + blocked_reason: string | null; + last_seen_at: number | null; + interaction_count: number; + last_interaction: number | null; + created_at: number; + updated_at: number | null; +}; + +function toContact(row: ContactRow): Contact { + return { + id: row.id, + displayName: row.display_name, + role: row.role, + principalId: row.principal_id, + createdAt: row.created_at, + updatedAt: row.updated_at, + }; +} + +function toContactChannel(row: ContactChannelRow): ContactChannel { + return { + id: row.id, + contactId: row.contact_id, + type: row.type, + address: row.address, + isPrimary: row.is_primary === 1, + externalUserId: row.external_user_id, + externalChatId: row.external_chat_id, + status: row.status, + policy: row.policy, + verifiedAt: row.verified_at, + verifiedVia: row.verified_via, + inviteId: row.invite_id, + revokedReason: row.revoked_reason, + blockedReason: row.blocked_reason, + lastSeenAt: row.last_seen_at, + interactionCount: row.interaction_count, + lastInteraction: row.last_interaction, + createdAt: row.created_at, + updatedAt: row.updated_at, + }; +} + +export class ContactStore { + private db: Database; + + private _getContact: Statement | null = null; + private _listContacts: Statement | null = null; + private _getContactByChannel: Statement | null = null; + private _getChannelsForContact: Statement | null = null; + + constructor(db?: Database) { + this.db = db ?? getGatewayDb(); + } + + getContact(contactId: string): Contact | null { + const stmt = + this._getContact ?? + (this._getContact = this.db.prepare( + "SELECT * FROM contacts WHERE id = ?", + )); + const row = stmt.get(contactId) as ContactRow | null; + return row ? toContact(row) : null; + } + + listContacts(): Contact[] { + const stmt = + this._listContacts ?? + (this._listContacts = this.db.prepare( + "SELECT * FROM contacts ORDER BY created_at DESC", + )); + return (stmt.all() as ContactRow[]).map(toContact); + } + + getContactByChannel( + channelType: string, + externalUserId: string, + ): Contact | null { + const stmt = + this._getContactByChannel ?? + (this._getContactByChannel = this.db.prepare( + `SELECT c.* FROM contacts c + JOIN contact_channels cc ON cc.contact_id = c.id + WHERE cc.type = ? AND cc.external_user_id = ? + LIMIT 1`, + )); + const row = stmt.get(channelType, externalUserId) as ContactRow | null; + return row ? toContact(row) : null; + } + + getChannelsForContact(contactId: string): ContactChannel[] { + const stmt = + this._getChannelsForContact ?? + (this._getChannelsForContact = this.db.prepare( + "SELECT * FROM contact_channels WHERE contact_id = ? ORDER BY created_at ASC", + )); + return (stmt.all(contactId) as ContactChannelRow[]).map(toContactChannel); + } +} diff --git a/gateway/src/index.ts b/gateway/src/index.ts index 910f6b36c27..ddbe0b5d0a1 100644 --- a/gateway/src/index.ts +++ b/gateway/src/index.ts @@ -120,6 +120,7 @@ import { isNewCommand, handleNewCommand } from "./webhook-pipeline.js"; import { reconcileTelegramWebhook } from "./telegram/webhook-manager.js"; import { registerEmailCallbackRoute } from "./email/register-callback.js"; import { GatewayIpcServer } from "./ipc/server.js"; +import { contactRoutes } from "./ipc/contact-handlers.js"; import { featureFlagRoutes, getMergedFeatureFlags, @@ -1773,7 +1774,10 @@ async function main() { configFileWatcher.start(); // ── IPC server ── - const ipcServer = new GatewayIpcServer([...featureFlagRoutes]); + const ipcServer = new GatewayIpcServer([ + ...featureFlagRoutes, + ...contactRoutes, + ]); ipcServer.start(); const featureFlagWatcher = new FeatureFlagWatcher(); diff --git a/gateway/src/ipc/contact-handlers.ts b/gateway/src/ipc/contact-handlers.ts new file mode 100644 index 00000000000..5ab158c0d42 --- /dev/null +++ b/gateway/src/ipc/contact-handlers.ts @@ -0,0 +1,65 @@ +/** + * IPC route definitions for contact reads. + * + * Exposes gateway-owned contact data (auth/authz) to the assistant + * daemon over the IPC socket. All methods are read-only. + */ + +import { z } from "zod"; + +import { ContactStore } from "../db/contact-store.js"; +import type { IpcRoute } from "./server.js"; + +let store: ContactStore | null = null; + +function getStore(): ContactStore { + if (!store) { + store = new ContactStore(); + } + return store; +} + +const GetContactParamsSchema = z.object({ + contactId: z.string(), +}); + +const GetContactByChannelParamsSchema = z.object({ + channelType: z.string(), + externalUserId: z.string(), +}); + +const GetChannelsForContactParamsSchema = z.object({ + contactId: z.string(), +}); + +export const contactRoutes: IpcRoute[] = [ + { + method: "list_contacts", + handler: () => getStore().listContacts(), + }, + { + method: "get_contact", + schema: GetContactParamsSchema, + handler: (params?: Record) => { + const contactId = params?.contactId as string; + return getStore().getContact(contactId); + }, + }, + { + method: "get_contact_by_channel", + schema: GetContactByChannelParamsSchema, + handler: (params?: Record) => { + const channelType = params?.channelType as string; + const externalUserId = params?.externalUserId as string; + return getStore().getContactByChannel(channelType, externalUserId); + }, + }, + { + method: "get_channels_for_contact", + schema: GetChannelsForContactParamsSchema, + handler: (params?: Record) => { + const contactId = params?.contactId as string; + return getStore().getChannelsForContact(contactId); + }, + }, +];