From 464db23e475c29974e90eaa51fbb1d59f9267137 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 15 Apr 2026 23:14:09 +0000 Subject: [PATCH 1/2] feat: add contacts + contact_channels tables to gateway SQLite MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Gateway cutover step 1: declare contacts and contact_channels tables in the gateway DB schema. This is the foundation for moving contact auth/authz ownership from the assistant daemon to the gateway. - contacts: mirrors assistant's contacts table (auth/authz fields only) - contact_channels: mirrors assistant's contact_channels table with same indexes (type+external_user_id, type+external_chat_id) - m0002-seed-contacts: one-time data migration that seeds both tables from assistant.db on first startup (INSERT OR IGNORE, transactional) - ContactStore: read-only store with prepared-statement queries (getContact, listContacts, getContactByChannel, getChannelsForContact) - IPC handlers: list_contacts, get_contact, get_contact_by_channel, get_channels_for_contact — wired into the gateway IPC server - Tests: ContactStore unit tests + IPC round-trip tests --- .../src/__tests__/ipc-contact-routes.test.ts | 300 ++++++++++++++++++ gateway/src/db/connection.ts | 48 +++ gateway/src/db/contact-store.ts | 165 ++++++++++ gateway/src/db/data-migrations/index.ts | 2 + .../db/data-migrations/m0002-seed-contacts.ts | 189 +++++++++++ gateway/src/index.ts | 6 +- gateway/src/ipc/contact-handlers.ts | 65 ++++ 7 files changed, 774 insertions(+), 1 deletion(-) create mode 100644 gateway/src/__tests__/ipc-contact-routes.test.ts create mode 100644 gateway/src/db/contact-store.ts create mode 100644 gateway/src/db/data-migrations/m0002-seed-contacts.ts create mode 100644 gateway/src/ipc/contact-handlers.ts diff --git a/gateway/src/__tests__/ipc-contact-routes.test.ts b/gateway/src/__tests__/ipc-contact-routes.test.ts new file mode 100644 index 00000000000..003c1ac9138 --- /dev/null +++ b/gateway/src/__tests__/ipc-contact-routes.test.ts @@ -0,0 +1,300 @@ +import { describe, test, expect, beforeEach, afterEach } from "bun:test"; +import { mkdirSync, rmSync, existsSync } from "node:fs"; +import { join } from "node:path"; +import { tmpdir } from "node:os"; +import { randomBytes } from "node:crypto"; +import { createConnection, type Socket } from "node:net"; +import { Database } from "bun:sqlite"; + +const testDir = join( + tmpdir(), + `vellum-ipc-contact-test-${randomBytes(6).toString("hex")}`, +); +const protectedDir = join(testDir, ".vellum", "protected"); +const socketPath = join(testDir, "gateway.sock"); + +const savedWorkspaceDir = process.env.VELLUM_WORKSPACE_DIR; +const savedGatewaySecurityDir = process.env.GATEWAY_SECURITY_DIR; + +beforeEach(() => { + process.env.VELLUM_WORKSPACE_DIR = testDir; + process.env.GATEWAY_SECURITY_DIR = protectedDir; + mkdirSync(protectedDir, { recursive: true }); +}); + +afterEach(() => { + if (savedWorkspaceDir === undefined) { + delete process.env.VELLUM_WORKSPACE_DIR; + } else { + process.env.VELLUM_WORKSPACE_DIR = savedWorkspaceDir; + } + if (savedGatewaySecurityDir === undefined) { + delete process.env.GATEWAY_SECURITY_DIR; + } else { + process.env.GATEWAY_SECURITY_DIR = savedGatewaySecurityDir; + } + try { + rmSync(testDir, { recursive: true, force: true }); + } catch { + // best effort cleanup + } +}); + +const { GatewayIpcServer } = await import("../ipc/server.js"); +const { contactRoutes } = await import("../ipc/contact-handlers.js"); +const { ContactStore } = await import("../db/contact-store.js"); +const { getGatewayDb } = await import("../db/connection.js"); + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function connectClient(path: string): Promise { + return new Promise((resolve, reject) => { + const client = createConnection(path, () => resolve(client)); + client.on("error", reject); + }); +} + +function sendRequest( + client: Socket, + method: string, + params?: Record, +): Promise<{ id: string; result?: unknown; error?: string }> { + return new Promise((resolve, reject) => { + const id = randomBytes(4).toString("hex"); + let buffer = ""; + + const onData = (chunk: Buffer) => { + buffer += chunk.toString(); + const newlineIdx = buffer.indexOf("\n"); + if (newlineIdx !== -1) { + const line = buffer.slice(0, newlineIdx).trim(); + buffer = buffer.slice(newlineIdx + 1); + client.off("data", onData); + try { + resolve(JSON.parse(line)); + } catch (err) { + reject(err); + } + } + }; + + client.on("data", onData); + const msg = JSON.stringify({ id, method, params }); + client.write(msg + "\n"); + }); +} + +function seedTestData(db: Database): void { + const now = Date.now(); + + db.exec( + `INSERT INTO contacts (id, display_name, notes, role, principal_id, user_file, contact_type, created_at, updated_at) + VALUES ('c1', 'Vargas', 'Founding engineer', 'guardian', 'p1', NULL, 'human', ${now}, ${now})`, + ); + + db.exec( + `INSERT INTO contacts (id, display_name, notes, role, principal_id, user_file, contact_type, created_at, updated_at) + VALUES ('c2', 'Alice', NULL, 'contact', NULL, NULL, 'human', ${now}, ${now})`, + ); + + db.exec( + `INSERT INTO contact_channels (id, contact_id, type, address, is_primary, external_user_id, external_chat_id, status, policy, interaction_count, created_at) + VALUES ('ch1', 'c1', 'telegram', 'vargas@tg', 1, 'tg-123', 'chat-456', 'active', 'allow', 5, ${now})`, + ); + + db.exec( + `INSERT INTO contact_channels (id, contact_id, type, address, is_primary, external_user_id, external_chat_id, status, policy, interaction_count, created_at) + VALUES ('ch2', 'c1', 'slack', 'U05D5EGNNMS', 0, 'U05D5EGNNMS', 'D09Q9FG277H', 'active', 'allow', 10, ${now})`, + ); + + db.exec( + `INSERT INTO contact_channels (id, contact_id, type, address, is_primary, external_user_id, external_chat_id, status, policy, interaction_count, created_at) + VALUES ('ch3', 'c2', 'email', 'alice@example.com', 1, NULL, NULL, 'unverified', 'escalate', 0, ${now})`, + ); +} + +// --------------------------------------------------------------------------- +// ContactStore unit tests +// --------------------------------------------------------------------------- + +describe("ContactStore", () => { + test("listContacts returns all contacts", () => { + const db = getGatewayDb(); + seedTestData(db); + const store = new ContactStore(db); + + const contacts = store.listContacts(); + expect(contacts).toHaveLength(2); + expect(contacts.map((c) => c.id).sort()).toEqual(["c1", "c2"]); + }); + + test("getContact returns a single contact", () => { + const db = getGatewayDb(); + seedTestData(db); + const store = new ContactStore(db); + + const contact = store.getContact("c1"); + expect(contact).not.toBeNull(); + expect(contact!.displayName).toBe("Vargas"); + expect(contact!.role).toBe("guardian"); + }); + + test("getContact returns null for unknown id", () => { + const db = getGatewayDb(); + seedTestData(db); + const store = new ContactStore(db); + + expect(store.getContact("nonexistent")).toBeNull(); + }); + + test("getContactByChannel finds contact by channel type and external user id", () => { + const db = getGatewayDb(); + seedTestData(db); + const store = new ContactStore(db); + + const contact = store.getContactByChannel("telegram", "tg-123"); + expect(contact).not.toBeNull(); + expect(contact!.id).toBe("c1"); + }); + + test("getContactByChannel returns null for unknown channel", () => { + const db = getGatewayDb(); + seedTestData(db); + const store = new ContactStore(db); + + expect(store.getContactByChannel("telegram", "nonexistent")).toBeNull(); + }); + + test("getChannelsForContact returns all channels for a contact", () => { + const db = getGatewayDb(); + seedTestData(db); + const store = new ContactStore(db); + + const channels = store.getChannelsForContact("c1"); + expect(channels).toHaveLength(2); + expect(channels.map((ch) => ch.type).sort()).toEqual(["slack", "telegram"]); + }); + + test("getChannelsForContact returns empty array for unknown contact", () => { + const db = getGatewayDb(); + seedTestData(db); + const store = new ContactStore(db); + + expect(store.getChannelsForContact("nonexistent")).toHaveLength(0); + }); + + test("contact_channels cascade deletes when contact is deleted", () => { + const db = getGatewayDb(); + seedTestData(db); + const store = new ContactStore(db); + + expect(store.getChannelsForContact("c1")).toHaveLength(2); + db.exec("DELETE FROM contacts WHERE id = 'c1'"); + expect(store.getChannelsForContact("c1")).toHaveLength(0); + }); +}); + +// --------------------------------------------------------------------------- +// IPC route tests +// --------------------------------------------------------------------------- + +describe("IPC contact routes", () => { + let server: InstanceType; + let client: Socket; + + beforeEach(async () => { + if (existsSync(socketPath)) { + rmSync(socketPath); + } + }); + + afterEach(() => { + client?.destroy(); + server?.stop(); + }); + + async function startServerAndConnect(): Promise { + server = new GatewayIpcServer([...contactRoutes]); + (server as unknown as { socketPath: string }).socketPath = socketPath; + server.start(); + await new Promise((resolve) => setTimeout(resolve, 50)); + client = await connectClient(socketPath); + } + + test("list_contacts returns seeded contacts via IPC", async () => { + const db = getGatewayDb(); + seedTestData(db); + + await startServerAndConnect(); + const res = await sendRequest(client, "list_contacts"); + + expect(res.error).toBeUndefined(); + const contacts = res.result as { id: string; displayName: string }[]; + expect(contacts).toHaveLength(2); + }); + + test("get_contact returns a specific contact via IPC", async () => { + const db = getGatewayDb(); + seedTestData(db); + + await startServerAndConnect(); + const res = await sendRequest(client, "get_contact", { contactId: "c1" }); + + expect(res.error).toBeUndefined(); + const contact = res.result as { id: string; displayName: string }; + expect(contact.id).toBe("c1"); + expect(contact.displayName).toBe("Vargas"); + }); + + test("get_contact returns null for unknown contact", async () => { + const db = getGatewayDb(); + seedTestData(db); + + await startServerAndConnect(); + const res = await sendRequest(client, "get_contact", { + contactId: "nonexistent", + }); + + expect(res.error).toBeUndefined(); + expect(res.result).toBeNull(); + }); + + test("get_contact_by_channel resolves contact from channel info", async () => { + const db = getGatewayDb(); + seedTestData(db); + + await startServerAndConnect(); + const res = await sendRequest(client, "get_contact_by_channel", { + channelType: "slack", + externalUserId: "U05D5EGNNMS", + }); + + expect(res.error).toBeUndefined(); + const contact = res.result as { id: string }; + expect(contact.id).toBe("c1"); + }); + + test("get_channels_for_contact returns channel list", async () => { + const db = getGatewayDb(); + seedTestData(db); + + await startServerAndConnect(); + const res = await sendRequest(client, "get_channels_for_contact", { + contactId: "c1", + }); + + expect(res.error).toBeUndefined(); + const channels = res.result as { id: string; type: string }[]; + expect(channels).toHaveLength(2); + }); + + test("get_contact validates params", async () => { + await startServerAndConnect(); + const res = await sendRequest(client, "get_contact", {}); + + expect(res.error).toBeDefined(); + expect(res.error).toContain("Invalid params"); + }); +}); diff --git a/gateway/src/db/connection.ts b/gateway/src/db/connection.ts index b873a250d38..4abc2d6e4b4 100644 --- a/gateway/src/db/connection.ts +++ b/gateway/src/db/connection.ts @@ -87,4 +87,52 @@ function migrate(db: Database): void { ran_at INTEGER NOT NULL ) `); + + db.exec(` + CREATE TABLE IF NOT EXISTS contacts ( + id TEXT PRIMARY KEY, + display_name TEXT NOT NULL, + notes TEXT, + role TEXT NOT NULL DEFAULT 'contact', + principal_id TEXT, + user_file TEXT, + contact_type TEXT NOT NULL DEFAULT 'human', + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL + ) + `); + + db.exec(` + CREATE TABLE IF NOT EXISTS contact_channels ( + id TEXT PRIMARY KEY, + contact_id TEXT NOT NULL REFERENCES contacts(id) ON DELETE CASCADE, + type TEXT NOT NULL, + address TEXT NOT NULL, + is_primary INTEGER NOT NULL DEFAULT 0, + external_user_id TEXT, + external_chat_id TEXT, + status TEXT NOT NULL DEFAULT 'unverified', + policy TEXT NOT NULL DEFAULT 'allow', + verified_at INTEGER, + verified_via TEXT, + invite_id TEXT, + revoked_reason TEXT, + blocked_reason TEXT, + last_seen_at INTEGER, + interaction_count INTEGER NOT NULL DEFAULT 0, + last_interaction INTEGER, + created_at INTEGER NOT NULL, + updated_at INTEGER + ) + `); + + db.exec(` + CREATE INDEX IF NOT EXISTS idx_contact_channels_type_ext_user + ON contact_channels(type, external_user_id) + `); + + db.exec(` + CREATE INDEX IF NOT EXISTS idx_contact_channels_type_ext_chat + ON contact_channels(type, external_chat_id) + `); } diff --git a/gateway/src/db/contact-store.ts b/gateway/src/db/contact-store.ts new file mode 100644 index 00000000000..19ebc0f35e9 --- /dev/null +++ b/gateway/src/db/contact-store.ts @@ -0,0 +1,165 @@ +import type { Database, Statement } from "bun:sqlite"; +import { getGatewayDb } from "./connection.js"; + +export type Contact = { + id: string; + displayName: string; + notes: string | null; + role: string; + principalId: string | null; + userFile: string | null; + contactType: string; + createdAt: number; + updatedAt: number; +}; + +export type ContactChannel = { + id: string; + contactId: string; + type: string; + address: string; + isPrimary: boolean; + externalUserId: string | null; + externalChatId: string | null; + status: string; + policy: string; + verifiedAt: number | null; + verifiedVia: string | null; + inviteId: string | null; + revokedReason: string | null; + blockedReason: string | null; + lastSeenAt: number | null; + interactionCount: number; + lastInteraction: number | null; + createdAt: number; + updatedAt: number | null; +}; + +type ContactRow = { + id: string; + display_name: string; + notes: string | null; + role: string; + principal_id: string | null; + user_file: string | null; + contact_type: string; + created_at: number; + updated_at: number; +}; + +type ContactChannelRow = { + id: string; + contact_id: string; + type: string; + address: string; + is_primary: number; + external_user_id: string | null; + external_chat_id: string | null; + status: string; + policy: string; + verified_at: number | null; + verified_via: string | null; + invite_id: string | null; + revoked_reason: string | null; + blocked_reason: string | null; + last_seen_at: number | null; + interaction_count: number; + last_interaction: number | null; + created_at: number; + updated_at: number | null; +}; + +function toContact(row: ContactRow): Contact { + return { + id: row.id, + displayName: row.display_name, + notes: row.notes, + role: row.role, + principalId: row.principal_id, + userFile: row.user_file, + contactType: row.contact_type, + createdAt: row.created_at, + updatedAt: row.updated_at, + }; +} + +function toContactChannel(row: ContactChannelRow): ContactChannel { + return { + id: row.id, + contactId: row.contact_id, + type: row.type, + address: row.address, + isPrimary: row.is_primary === 1, + externalUserId: row.external_user_id, + externalChatId: row.external_chat_id, + status: row.status, + policy: row.policy, + verifiedAt: row.verified_at, + verifiedVia: row.verified_via, + inviteId: row.invite_id, + revokedReason: row.revoked_reason, + blockedReason: row.blocked_reason, + lastSeenAt: row.last_seen_at, + interactionCount: row.interaction_count, + lastInteraction: row.last_interaction, + createdAt: row.created_at, + updatedAt: row.updated_at, + }; +} + +export class ContactStore { + private db: Database; + + private _getContact: Statement | null = null; + private _listContacts: Statement | null = null; + private _getContactByChannel: Statement | null = null; + private _getChannelsForContact: Statement | null = null; + + constructor(db?: Database) { + this.db = db ?? getGatewayDb(); + } + + getContact(contactId: string): Contact | null { + const stmt = + this._getContact ?? + (this._getContact = this.db.prepare( + "SELECT * FROM contacts WHERE id = ?", + )); + const row = stmt.get(contactId) as ContactRow | null; + return row ? toContact(row) : null; + } + + listContacts(): Contact[] { + const stmt = + this._listContacts ?? + (this._listContacts = this.db.prepare( + "SELECT * FROM contacts ORDER BY created_at DESC", + )); + return (stmt.all() as ContactRow[]).map(toContact); + } + + getContactByChannel( + channelType: string, + externalUserId: string, + ): Contact | null { + const stmt = + this._getContactByChannel ?? + (this._getContactByChannel = this.db.prepare( + `SELECT c.* FROM contacts c + JOIN contact_channels cc ON cc.contact_id = c.id + WHERE cc.type = ? AND cc.external_user_id = ? + LIMIT 1`, + )); + const row = stmt.get(channelType, externalUserId) as ContactRow | null; + return row ? toContact(row) : null; + } + + getChannelsForContact(contactId: string): ContactChannel[] { + const stmt = + this._getChannelsForContact ?? + (this._getChannelsForContact = this.db.prepare( + "SELECT * FROM contact_channels WHERE contact_id = ? ORDER BY created_at ASC", + )); + return (stmt.all(contactId) as ContactChannelRow[]).map(toContactChannel); + } +} diff --git a/gateway/src/db/data-migrations/index.ts b/gateway/src/db/data-migrations/index.ts index aa44031ade5..051ed86a763 100644 --- a/gateway/src/db/data-migrations/index.ts +++ b/gateway/src/db/data-migrations/index.ts @@ -20,6 +20,7 @@ import type { Database } from "bun:sqlite"; import { getLogger } from "../../logger.js"; import * as m0001 from "./m0001-guardian-init-lock.js"; +import * as m0002 from "./m0002-seed-contacts.js"; const log = getLogger("data-migrations"); @@ -32,6 +33,7 @@ type MigrationModule = { const MIGRATIONS: { key: string; mod: MigrationModule }[] = [ { key: "m0001-guardian-init-lock", mod: m0001 }, + { key: "m0002-seed-contacts", mod: m0002 }, ]; /** diff --git a/gateway/src/db/data-migrations/m0002-seed-contacts.ts b/gateway/src/db/data-migrations/m0002-seed-contacts.ts new file mode 100644 index 00000000000..c0005b03c54 --- /dev/null +++ b/gateway/src/db/data-migrations/m0002-seed-contacts.ts @@ -0,0 +1,189 @@ +/** + * One-time migration: seed `contacts` and `contact_channels` from + * the assistant's database (`assistant.db`) into the gateway database. + * + * After this migration runs, the gateway owns the canonical copy of + * contact auth/authz data. The assistant daemon reads it via IPC. + */ + +import { Database } from "bun:sqlite"; +import { existsSync } from "node:fs"; +import { join } from "node:path"; + +import { getWorkspaceDir } from "../../credential-reader.js"; +import { getLogger } from "../../logger.js"; +import { getGatewayDb } from "../connection.js"; + +import type { MigrationResult } from "./index.js"; + +const log = getLogger("m0002-seed-contacts"); + +function getAssistantDbPath(): string { + return join(getWorkspaceDir(), "data", "db", "assistant.db"); +} + +export function up(): MigrationResult { + const assistantDbPath = getAssistantDbPath(); + + if (!existsSync(assistantDbPath)) { + log.info("No assistant.db found — nothing to seed"); + return "done"; + } + + let assistantDb: Database; + try { + assistantDb = new Database(assistantDbPath, { readonly: true }); + } catch (err) { + log.error({ err }, "Failed to open assistant.db — will retry"); + return "skip"; + } + + try { + const gatewayDb = getGatewayDb(); + + // Check if the contacts table exists in assistant.db + const tableCheck = assistantDb + .prepare( + "SELECT name FROM sqlite_master WHERE type='table' AND name='contacts'", + ) + .get() as { name: string } | null; + + if (!tableCheck) { + log.info("No contacts table in assistant.db — nothing to seed"); + return "done"; + } + + const contacts = assistantDb + .prepare( + `SELECT id, display_name, notes, role, principal_id, user_file, contact_type, created_at, updated_at + FROM contacts`, + ) + .all() as { + id: string; + display_name: string; + notes: string | null; + role: string; + principal_id: string | null; + user_file: string | null; + contact_type: string; + created_at: number; + updated_at: number; + }[]; + + const channelTableCheck = assistantDb + .prepare( + "SELECT name FROM sqlite_master WHERE type='table' AND name='contact_channels'", + ) + .get() as { name: string } | null; + + const channels = channelTableCheck + ? (assistantDb + .prepare( + `SELECT id, contact_id, type, address, is_primary, external_user_id, + external_chat_id, status, policy, verified_at, verified_via, + invite_id, revoked_reason, blocked_reason, last_seen_at, + interaction_count, last_interaction, created_at, updated_at + FROM contact_channels`, + ) + .all() as { + id: string; + contact_id: string; + type: string; + address: string; + is_primary: number; + external_user_id: string | null; + external_chat_id: string | null; + status: string; + policy: string; + verified_at: number | null; + verified_via: string | null; + invite_id: string | null; + revoked_reason: string | null; + blocked_reason: string | null; + last_seen_at: number | null; + interaction_count: number; + last_interaction: number | null; + created_at: number; + updated_at: number | null; + }[]) + : []; + + // Insert everything inside a single transaction for atomicity + gatewayDb.exec("BEGIN IMMEDIATE"); + try { + const insertContact = gatewayDb.prepare( + `INSERT OR IGNORE INTO contacts + (id, display_name, notes, role, principal_id, user_file, contact_type, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, + ); + + for (const c of contacts) { + insertContact.run( + c.id, + c.display_name, + c.notes, + c.role, + c.principal_id, + c.user_file, + c.contact_type, + c.created_at, + c.updated_at, + ); + } + + const insertChannel = gatewayDb.prepare( + `INSERT OR IGNORE INTO contact_channels + (id, contact_id, type, address, is_primary, external_user_id, + external_chat_id, status, policy, verified_at, verified_via, + invite_id, revoked_reason, blocked_reason, last_seen_at, + interaction_count, last_interaction, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + ); + + for (const ch of channels) { + insertChannel.run( + ch.id, + ch.contact_id, + ch.type, + ch.address, + ch.is_primary, + ch.external_user_id, + ch.external_chat_id, + ch.status, + ch.policy, + ch.verified_at, + ch.verified_via, + ch.invite_id, + ch.revoked_reason, + ch.blocked_reason, + ch.last_seen_at, + ch.interaction_count, + ch.last_interaction, + ch.created_at, + ch.updated_at, + ); + } + + gatewayDb.exec("COMMIT"); + + log.info( + { contacts: contacts.length, channels: channels.length }, + "Seeded contacts and contact_channels from assistant.db", + ); + } catch (err) { + gatewayDb.exec("ROLLBACK"); + throw err; + } + + return "done"; + } catch (err) { + log.error({ err }, "Failed to seed contacts — will retry"); + return "skip"; + } finally { + assistantDb.close(); + } +} + +export function down(): MigrationResult { + return "done"; +} diff --git a/gateway/src/index.ts b/gateway/src/index.ts index 910f6b36c27..ddbe0b5d0a1 100644 --- a/gateway/src/index.ts +++ b/gateway/src/index.ts @@ -120,6 +120,7 @@ import { isNewCommand, handleNewCommand } from "./webhook-pipeline.js"; import { reconcileTelegramWebhook } from "./telegram/webhook-manager.js"; import { registerEmailCallbackRoute } from "./email/register-callback.js"; import { GatewayIpcServer } from "./ipc/server.js"; +import { contactRoutes } from "./ipc/contact-handlers.js"; import { featureFlagRoutes, getMergedFeatureFlags, @@ -1773,7 +1774,10 @@ async function main() { configFileWatcher.start(); // ── IPC server ── - const ipcServer = new GatewayIpcServer([...featureFlagRoutes]); + const ipcServer = new GatewayIpcServer([ + ...featureFlagRoutes, + ...contactRoutes, + ]); ipcServer.start(); const featureFlagWatcher = new FeatureFlagWatcher(); diff --git a/gateway/src/ipc/contact-handlers.ts b/gateway/src/ipc/contact-handlers.ts new file mode 100644 index 00000000000..5ab158c0d42 --- /dev/null +++ b/gateway/src/ipc/contact-handlers.ts @@ -0,0 +1,65 @@ +/** + * IPC route definitions for contact reads. + * + * Exposes gateway-owned contact data (auth/authz) to the assistant + * daemon over the IPC socket. All methods are read-only. + */ + +import { z } from "zod"; + +import { ContactStore } from "../db/contact-store.js"; +import type { IpcRoute } from "./server.js"; + +let store: ContactStore | null = null; + +function getStore(): ContactStore { + if (!store) { + store = new ContactStore(); + } + return store; +} + +const GetContactParamsSchema = z.object({ + contactId: z.string(), +}); + +const GetContactByChannelParamsSchema = z.object({ + channelType: z.string(), + externalUserId: z.string(), +}); + +const GetChannelsForContactParamsSchema = z.object({ + contactId: z.string(), +}); + +export const contactRoutes: IpcRoute[] = [ + { + method: "list_contacts", + handler: () => getStore().listContacts(), + }, + { + method: "get_contact", + schema: GetContactParamsSchema, + handler: (params?: Record) => { + const contactId = params?.contactId as string; + return getStore().getContact(contactId); + }, + }, + { + method: "get_contact_by_channel", + schema: GetContactByChannelParamsSchema, + handler: (params?: Record) => { + const channelType = params?.channelType as string; + const externalUserId = params?.externalUserId as string; + return getStore().getContactByChannel(channelType, externalUserId); + }, + }, + { + method: "get_channels_for_contact", + schema: GetChannelsForContactParamsSchema, + handler: (params?: Record) => { + const contactId = params?.contactId as string; + return getStore().getChannelsForContact(contactId); + }, + }, +]; From 02703b17f187bf1c309e5eff10045b0cc784e1ad Mon Sep 17 00:00:00 2001 From: root Date: Wed, 15 Apr 2026 23:59:30 +0000 Subject: [PATCH 2/2] review: address Vargas feedback on PR #25951 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Strip contacts table to auth/authz-only: remove notes, user_file, contact_type columns (not needed for actor validation) - Remove m0002-seed-contacts data migration — hold off until endpoints have cutover and we're dual-writing - Move test imports to top level (no more inline await import()) - Use fake channel IDs in tests instead of real ones - Clean test state between runs (DELETE before seed) - Update ARCHITECTURE.md + gateway/ARCHITECTURE.md to document the contacts ownership migration direction - Add Drizzle migration + test preload env var cleanup tasks to workstream Up Next --- ARCHITECTURE.md | 3 +- gateway/ARCHITECTURE.md | 13 ++ .../src/__tests__/ipc-contact-routes.test.ts | 34 ++-- gateway/src/db/connection.ts | 3 - gateway/src/db/contact-store.ts | 9 - gateway/src/db/data-migrations/index.ts | 2 - .../db/data-migrations/m0002-seed-contacts.ts | 189 ------------------ 7 files changed, 33 insertions(+), 220 deletions(-) delete mode 100644 gateway/src/db/data-migrations/m0002-seed-contacts.ts diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index f3d6ef47f40..a2e50ddac99 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -189,6 +189,7 @@ In Docker mode (`IS_CONTAINERIZED=true`), services that need data from another s - **Trust rules**: The assistant reads/writes trust rules via the gateway's HTTP trust API. The gateway owns the filesystem copy at `/gateway-security/trust.json`. - **Credentials**: The assistant and gateway access credential CRUD via the CES HTTP API (`CES_CREDENTIAL_URL`), authenticated with `CES_SERVICE_TOKEN`. The CES owns the encryption keys at `/ces-security/`. +- **Contacts (auth/authz)**: The gateway owns `contacts` and `contact_channels` tables in its SQLite database (`/gateway-security/gateway.sqlite`). These tables store contact authentication and authorization data — who can talk to the assistant and what their channel policies are. The assistant daemon reads contact auth/authz data via IPC (`get_contact`, `list_contacts`, `get_contact_by_channel`, `get_channels_for_contact`). The assistant retains ownership of contact **context** (conversation history, memory associations, display preferences) in its own database. This separation is in progress — the gateway tables are declared and IPC handlers are wired, but endpoint cutover and data migration are not yet complete. ### Signing Key Bootstrap Protocol @@ -302,7 +303,7 @@ subgraph "Text Q&A Session" DB_TASKS["tasks"] DB_TASK_RUNS["task_runs"] DB_WORK_ITEMS["work_items"] - DB_CONTACTS["contacts"] + DB_CONTACTS["contacts
(migrating to gateway)"] end subgraph "Tracing" diff --git a/gateway/ARCHITECTURE.md b/gateway/ARCHITECTURE.md index 8fbf6b78e5a..b92b45f33eb 100644 --- a/gateway/ARCHITECTURE.md +++ b/gateway/ARCHITECTURE.md @@ -601,12 +601,23 @@ If no guardian binding exists for the channel, escalation fails closed -- the me #### SQLite Tables +**Assistant DB** (`assistant.db` — current owner, migrating to gateway): + | Table | Purpose | | --------------------------- | --------------------------------------------------------------------- | | `assistant_ingress_invites` | Invite tokens with SHA-256 hashes, expiry, use counts | | `contacts` | Contact records with role, relationship, and per-contact metadata | | `contact_channels` | Channel bindings per contact with access policy (allow/deny/escalate) | +**Gateway DB** (`gateway.sqlite` — future owner of auth/authz): + +| Table | Purpose | +| ------------------ | ---------------------------------------------------------------------- | +| `contacts` | Contact auth/authz: id, display_name, role, principal_id | +| `contact_channels` | Channel bindings with policy, status, external IDs, verification state | + +The gateway declares `contacts` and `contact_channels` tables and exposes them via IPC (`list_contacts`, `get_contact`, `get_contact_by_channel`, `get_channels_for_contact`). Endpoint cutover and data migration are in progress — the gateway will become the canonical owner once dual-writing is enabled. + #### Key Modules | Module | Purpose | @@ -616,6 +627,8 @@ If no guardian binding exists for the channel, escalation fails closed -- the me | `assistant/src/contacts/contacts-write.ts` | Contact and channel writes (upsert, policy changes, invite redemption) | | `assistant/src/daemon/handlers/config-inbox.ts` | Handlers for invite and member contracts | | `assistant/src/runtime/routes/channel-routes.ts` | ACL enforcement point -- member lookup, policy check, escalation creation | +| `gateway/src/db/contact-store.ts` | Gateway-side read-only ContactStore (prepared-statement queries) | +| `gateway/src/ipc/contact-handlers.ts` | IPC route handlers for contact reads | ### Telegram Credential Flow diff --git a/gateway/src/__tests__/ipc-contact-routes.test.ts b/gateway/src/__tests__/ipc-contact-routes.test.ts index 003c1ac9138..a5ecd70a4a2 100644 --- a/gateway/src/__tests__/ipc-contact-routes.test.ts +++ b/gateway/src/__tests__/ipc-contact-routes.test.ts @@ -5,6 +5,10 @@ import { tmpdir } from "node:os"; import { randomBytes } from "node:crypto"; import { createConnection, type Socket } from "node:net"; import { Database } from "bun:sqlite"; +import { GatewayIpcServer } from "../ipc/server.js"; +import { contactRoutes } from "../ipc/contact-handlers.js"; +import { ContactStore } from "../db/contact-store.js"; +import { getGatewayDb } from "../db/connection.js"; const testDir = join( tmpdir(), @@ -40,11 +44,6 @@ afterEach(() => { } }); -const { GatewayIpcServer } = await import("../ipc/server.js"); -const { contactRoutes } = await import("../ipc/contact-handlers.js"); -const { ContactStore } = await import("../db/contact-store.js"); -const { getGatewayDb } = await import("../db/connection.js"); - // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- @@ -89,29 +88,32 @@ function sendRequest( function seedTestData(db: Database): void { const now = Date.now(); + db.exec("DELETE FROM contact_channels"); + db.exec("DELETE FROM contacts"); + db.exec( - `INSERT INTO contacts (id, display_name, notes, role, principal_id, user_file, contact_type, created_at, updated_at) - VALUES ('c1', 'Vargas', 'Founding engineer', 'guardian', 'p1', NULL, 'human', ${now}, ${now})`, + `INSERT INTO contacts (id, display_name, role, principal_id, created_at, updated_at) + VALUES ('c1', 'Test Guardian', 'guardian', 'p1', ${now}, ${now})`, ); db.exec( - `INSERT INTO contacts (id, display_name, notes, role, principal_id, user_file, contact_type, created_at, updated_at) - VALUES ('c2', 'Alice', NULL, 'contact', NULL, NULL, 'human', ${now}, ${now})`, + `INSERT INTO contacts (id, display_name, role, principal_id, created_at, updated_at) + VALUES ('c2', 'Test Contact', 'contact', NULL, ${now}, ${now})`, ); db.exec( `INSERT INTO contact_channels (id, contact_id, type, address, is_primary, external_user_id, external_chat_id, status, policy, interaction_count, created_at) - VALUES ('ch1', 'c1', 'telegram', 'vargas@tg', 1, 'tg-123', 'chat-456', 'active', 'allow', 5, ${now})`, + VALUES ('ch1', 'c1', 'telegram', 'test-tg-user', 1, 'tg-fake-001', 'chat-fake-001', 'active', 'allow', 5, ${now})`, ); db.exec( `INSERT INTO contact_channels (id, contact_id, type, address, is_primary, external_user_id, external_chat_id, status, policy, interaction_count, created_at) - VALUES ('ch2', 'c1', 'slack', 'U05D5EGNNMS', 0, 'U05D5EGNNMS', 'D09Q9FG277H', 'active', 'allow', 10, ${now})`, + VALUES ('ch2', 'c1', 'slack', 'test-slack-user', 0, 'UFAKE00001', 'DFAKE00001', 'active', 'allow', 10, ${now})`, ); db.exec( `INSERT INTO contact_channels (id, contact_id, type, address, is_primary, external_user_id, external_chat_id, status, policy, interaction_count, created_at) - VALUES ('ch3', 'c2', 'email', 'alice@example.com', 1, NULL, NULL, 'unverified', 'escalate', 0, ${now})`, + VALUES ('ch3', 'c2', 'email', 'test@example.com', 1, NULL, NULL, 'unverified', 'escalate', 0, ${now})`, ); } @@ -137,7 +139,7 @@ describe("ContactStore", () => { const contact = store.getContact("c1"); expect(contact).not.toBeNull(); - expect(contact!.displayName).toBe("Vargas"); + expect(contact!.displayName).toBe("Test Guardian"); expect(contact!.role).toBe("guardian"); }); @@ -154,7 +156,7 @@ describe("ContactStore", () => { seedTestData(db); const store = new ContactStore(db); - const contact = store.getContactByChannel("telegram", "tg-123"); + const contact = store.getContactByChannel("telegram", "tg-fake-001"); expect(contact).not.toBeNull(); expect(contact!.id).toBe("c1"); }); @@ -245,7 +247,7 @@ describe("IPC contact routes", () => { expect(res.error).toBeUndefined(); const contact = res.result as { id: string; displayName: string }; expect(contact.id).toBe("c1"); - expect(contact.displayName).toBe("Vargas"); + expect(contact.displayName).toBe("Test Guardian"); }); test("get_contact returns null for unknown contact", async () => { @@ -268,7 +270,7 @@ describe("IPC contact routes", () => { await startServerAndConnect(); const res = await sendRequest(client, "get_contact_by_channel", { channelType: "slack", - externalUserId: "U05D5EGNNMS", + externalUserId: "UFAKE00001", }); expect(res.error).toBeUndefined(); diff --git a/gateway/src/db/connection.ts b/gateway/src/db/connection.ts index 4abc2d6e4b4..40c6d77434b 100644 --- a/gateway/src/db/connection.ts +++ b/gateway/src/db/connection.ts @@ -92,11 +92,8 @@ function migrate(db: Database): void { CREATE TABLE IF NOT EXISTS contacts ( id TEXT PRIMARY KEY, display_name TEXT NOT NULL, - notes TEXT, role TEXT NOT NULL DEFAULT 'contact', principal_id TEXT, - user_file TEXT, - contact_type TEXT NOT NULL DEFAULT 'human', created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL ) diff --git a/gateway/src/db/contact-store.ts b/gateway/src/db/contact-store.ts index 19ebc0f35e9..404208ab837 100644 --- a/gateway/src/db/contact-store.ts +++ b/gateway/src/db/contact-store.ts @@ -4,11 +4,8 @@ import { getGatewayDb } from "./connection.js"; export type Contact = { id: string; displayName: string; - notes: string | null; role: string; principalId: string | null; - userFile: string | null; - contactType: string; createdAt: number; updatedAt: number; }; @@ -38,11 +35,8 @@ export type ContactChannel = { type ContactRow = { id: string; display_name: string; - notes: string | null; role: string; principal_id: string | null; - user_file: string | null; - contact_type: string; created_at: number; updated_at: number; }; @@ -73,11 +67,8 @@ function toContact(row: ContactRow): Contact { return { id: row.id, displayName: row.display_name, - notes: row.notes, role: row.role, principalId: row.principal_id, - userFile: row.user_file, - contactType: row.contact_type, createdAt: row.created_at, updatedAt: row.updated_at, }; diff --git a/gateway/src/db/data-migrations/index.ts b/gateway/src/db/data-migrations/index.ts index 051ed86a763..aa44031ade5 100644 --- a/gateway/src/db/data-migrations/index.ts +++ b/gateway/src/db/data-migrations/index.ts @@ -20,7 +20,6 @@ import type { Database } from "bun:sqlite"; import { getLogger } from "../../logger.js"; import * as m0001 from "./m0001-guardian-init-lock.js"; -import * as m0002 from "./m0002-seed-contacts.js"; const log = getLogger("data-migrations"); @@ -33,7 +32,6 @@ type MigrationModule = { const MIGRATIONS: { key: string; mod: MigrationModule }[] = [ { key: "m0001-guardian-init-lock", mod: m0001 }, - { key: "m0002-seed-contacts", mod: m0002 }, ]; /** diff --git a/gateway/src/db/data-migrations/m0002-seed-contacts.ts b/gateway/src/db/data-migrations/m0002-seed-contacts.ts deleted file mode 100644 index c0005b03c54..00000000000 --- a/gateway/src/db/data-migrations/m0002-seed-contacts.ts +++ /dev/null @@ -1,189 +0,0 @@ -/** - * One-time migration: seed `contacts` and `contact_channels` from - * the assistant's database (`assistant.db`) into the gateway database. - * - * After this migration runs, the gateway owns the canonical copy of - * contact auth/authz data. The assistant daemon reads it via IPC. - */ - -import { Database } from "bun:sqlite"; -import { existsSync } from "node:fs"; -import { join } from "node:path"; - -import { getWorkspaceDir } from "../../credential-reader.js"; -import { getLogger } from "../../logger.js"; -import { getGatewayDb } from "../connection.js"; - -import type { MigrationResult } from "./index.js"; - -const log = getLogger("m0002-seed-contacts"); - -function getAssistantDbPath(): string { - return join(getWorkspaceDir(), "data", "db", "assistant.db"); -} - -export function up(): MigrationResult { - const assistantDbPath = getAssistantDbPath(); - - if (!existsSync(assistantDbPath)) { - log.info("No assistant.db found — nothing to seed"); - return "done"; - } - - let assistantDb: Database; - try { - assistantDb = new Database(assistantDbPath, { readonly: true }); - } catch (err) { - log.error({ err }, "Failed to open assistant.db — will retry"); - return "skip"; - } - - try { - const gatewayDb = getGatewayDb(); - - // Check if the contacts table exists in assistant.db - const tableCheck = assistantDb - .prepare( - "SELECT name FROM sqlite_master WHERE type='table' AND name='contacts'", - ) - .get() as { name: string } | null; - - if (!tableCheck) { - log.info("No contacts table in assistant.db — nothing to seed"); - return "done"; - } - - const contacts = assistantDb - .prepare( - `SELECT id, display_name, notes, role, principal_id, user_file, contact_type, created_at, updated_at - FROM contacts`, - ) - .all() as { - id: string; - display_name: string; - notes: string | null; - role: string; - principal_id: string | null; - user_file: string | null; - contact_type: string; - created_at: number; - updated_at: number; - }[]; - - const channelTableCheck = assistantDb - .prepare( - "SELECT name FROM sqlite_master WHERE type='table' AND name='contact_channels'", - ) - .get() as { name: string } | null; - - const channels = channelTableCheck - ? (assistantDb - .prepare( - `SELECT id, contact_id, type, address, is_primary, external_user_id, - external_chat_id, status, policy, verified_at, verified_via, - invite_id, revoked_reason, blocked_reason, last_seen_at, - interaction_count, last_interaction, created_at, updated_at - FROM contact_channels`, - ) - .all() as { - id: string; - contact_id: string; - type: string; - address: string; - is_primary: number; - external_user_id: string | null; - external_chat_id: string | null; - status: string; - policy: string; - verified_at: number | null; - verified_via: string | null; - invite_id: string | null; - revoked_reason: string | null; - blocked_reason: string | null; - last_seen_at: number | null; - interaction_count: number; - last_interaction: number | null; - created_at: number; - updated_at: number | null; - }[]) - : []; - - // Insert everything inside a single transaction for atomicity - gatewayDb.exec("BEGIN IMMEDIATE"); - try { - const insertContact = gatewayDb.prepare( - `INSERT OR IGNORE INTO contacts - (id, display_name, notes, role, principal_id, user_file, contact_type, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, - ); - - for (const c of contacts) { - insertContact.run( - c.id, - c.display_name, - c.notes, - c.role, - c.principal_id, - c.user_file, - c.contact_type, - c.created_at, - c.updated_at, - ); - } - - const insertChannel = gatewayDb.prepare( - `INSERT OR IGNORE INTO contact_channels - (id, contact_id, type, address, is_primary, external_user_id, - external_chat_id, status, policy, verified_at, verified_via, - invite_id, revoked_reason, blocked_reason, last_seen_at, - interaction_count, last_interaction, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, - ); - - for (const ch of channels) { - insertChannel.run( - ch.id, - ch.contact_id, - ch.type, - ch.address, - ch.is_primary, - ch.external_user_id, - ch.external_chat_id, - ch.status, - ch.policy, - ch.verified_at, - ch.verified_via, - ch.invite_id, - ch.revoked_reason, - ch.blocked_reason, - ch.last_seen_at, - ch.interaction_count, - ch.last_interaction, - ch.created_at, - ch.updated_at, - ); - } - - gatewayDb.exec("COMMIT"); - - log.info( - { contacts: contacts.length, channels: channels.length }, - "Seeded contacts and contact_channels from assistant.db", - ); - } catch (err) { - gatewayDb.exec("ROLLBACK"); - throw err; - } - - return "done"; - } catch (err) { - log.error({ err }, "Failed to seed contacts — will retry"); - return "skip"; - } finally { - assistantDb.close(); - } -} - -export function down(): MigrationResult { - return "done"; -}