diff --git a/assistant/src/runtime/__tests__/mirror-invite-to-gateway.test.ts b/assistant/src/runtime/__tests__/mirror-invite-to-gateway.test.ts new file mode 100644 index 00000000000..301a7996947 --- /dev/null +++ b/assistant/src/runtime/__tests__/mirror-invite-to-gateway.test.ts @@ -0,0 +1,154 @@ +/** + * mirrorInviteToGateway — daemon-side best-effort behavior. + * + * Track B PR-B-1: a mirror failure must NEVER throw to the createIngressInvite + * caller (the daemon-owned authoritative write is the source of truth). This + * test pins that contract by mocking the gateway IPC client to reject and + * asserting the helper resolves normally. + * + * Also asserts the wire payload contains every field daemon-side + * IngressInvite carries — so future schema additions on either side don't + * silently drift. + */ + +import { beforeEach, describe, expect, mock, test } from "bun:test"; + +type IpcCallArgs = { + method: string; + params?: Record; +}; + +const ipcCalls: IpcCallArgs[] = []; +let ipcCallImpl: ( + method: string, + params?: Record, +) => Promise = async () => ({}); + +mock.module("../../ipc/gateway-client.js", () => ({ + ipcCall: async (method: string, params?: Record) => { + ipcCalls.push({ method, params }); + return ipcCallImpl(method, params); + }, +})); + +// invite-service pulls a bunch of LLM/channel-adapter modules eagerly. Stub +// the ones that touch real I/O so the import doesn't side-effect. +mock.module("../channel-invite-transport.js", () => ({ + getInviteAdapterRegistry: () => ({}), + resolveAdapterHandle: () => undefined, +})); +mock.module("../invite-instruction-generator.js", () => ({ + generateInviteInstruction: async () => "", +})); +mock.module("../invite-redemption-service.js", () => ({ + redeemInvite: async () => ({}), + redeemVoiceInviteCode: async () => ({}), + redeemInviteByCode: async () => ({}), +})); +mock.module("../calls/call-domain.js", () => ({ + startInviteCall: async () => ({}), +})); + +const { mirrorInviteToGateway } = await import("../invite-service.js"); + +const baseInvite = () => ({ + id: "inv-daemon-1", + sourceChannel: "telegram", + tokenHash: "tok-h", + sourceConversationId: null, + note: null, + maxUses: 1, + useCount: 0, + expiresAt: Date.now() + 60_000, + status: "active" as const, + redeemedByExternalUserId: null, + redeemedByExternalChatId: null, + redeemedAt: null, + expectedExternalUserId: null, + voiceCodeHash: null, + voiceCodeDigits: null, + inviteCodeHash: null, + friendName: null, + guardianName: null, + contactId: "co-1", + createdAt: Date.now(), + updatedAt: Date.now(), +}); + +beforeEach(() => { + ipcCalls.length = 0; + ipcCallImpl = async () => ({}); +}); + +describe("mirrorInviteToGateway", () => { + test("fires mirror_invite_create with the full payload", async () => { + const invite = baseInvite(); + await mirrorInviteToGateway(invite); + + expect(ipcCalls).toHaveLength(1); + expect(ipcCalls[0]!.method).toBe("mirror_invite_create"); + const params = ipcCalls[0]!.params!; + + // Spot-check every field that flows over the wire. + for (const key of [ + "id", + "sourceChannel", + "tokenHash", + "sourceConversationId", + "note", + "maxUses", + "useCount", + "expiresAt", + "status", + "redeemedByExternalUserId", + "redeemedByExternalChatId", + "redeemedAt", + "expectedExternalUserId", + "voiceCodeHash", + "voiceCodeDigits", + "inviteCodeHash", + "friendName", + "guardianName", + "contactId", + "createdAt", + "updatedAt", + ] as const) { + expect(params).toHaveProperty(key); + } + + expect(params.id).toBe(invite.id); + expect(params.contactId).toBe(invite.contactId); + expect(params.tokenHash).toBe(invite.tokenHash); + }); + + test("swallows IPC errors (best-effort dual-write)", async () => { + ipcCallImpl = async () => { + throw new Error("gateway down"); + }; + + // The promise must resolve, not reject. + await expect(mirrorInviteToGateway(baseInvite())).resolves.toBeUndefined(); + expect(ipcCalls).toHaveLength(1); + }); + + test("forwards voice-invite fields when present", async () => { + const invite = { + ...baseInvite(), + sourceChannel: "phone", + expectedExternalUserId: "+15551234567", + voiceCodeHash: "voice-h", + voiceCodeDigits: 6, + friendName: "Alice", + guardianName: "Bob", + }; + await mirrorInviteToGateway(invite); + + const params = ipcCalls[0]!.params!; + expect(params.sourceChannel).toBe("phone"); + expect(params.expectedExternalUserId).toBe("+15551234567"); + expect(params.voiceCodeHash).toBe("voice-h"); + expect(params.voiceCodeDigits).toBe(6); + expect(params.friendName).toBe("Alice"); + expect(params.guardianName).toBe("Bob"); + }); +}); diff --git a/assistant/src/runtime/invite-service.ts b/assistant/src/runtime/invite-service.ts index 9a6af087b1c..ddae9767264 100644 --- a/assistant/src/runtime/invite-service.ts +++ b/assistant/src/runtime/invite-service.ts @@ -10,6 +10,7 @@ import { startInviteCall } from "../calls/call-domain.js"; import { isChannelId } from "../channels/types.js"; +import { ipcCall } from "../ipc/gateway-client.js"; import { createInvite, findById, @@ -26,6 +27,7 @@ import { DEFAULT_USER_REFERENCE, resolveGuardianName, } from "../prompts/user-reference.js"; +import { getLogger } from "../util/logger.js"; import { isValidE164 } from "../util/phone.js"; import { generateVoiceCode, hashVoiceCode } from "../util/voice-code.js"; import { @@ -39,6 +41,51 @@ import { type VoiceRedemptionOutcome, } from "./invite-redemption-service.js"; +const log = getLogger("invite-service"); + +/** + * Mirror an authoritative daemon-side invite row to the gateway's + * `ingress_invites` table via IPC. Best-effort: logs a warn on failure + * and never throws — the daemon's own write is the source of truth during + * Track B PR-B-1. + * + * See: memory/concepts/workstreams/track-b-invite-redemption.md + */ +export async function mirrorInviteToGateway( + invite: IngressInvite, +): Promise { + try { + await ipcCall("mirror_invite_create", { + id: invite.id, + sourceChannel: invite.sourceChannel, + tokenHash: invite.tokenHash, + sourceConversationId: invite.sourceConversationId, + note: invite.note, + maxUses: invite.maxUses, + useCount: invite.useCount, + expiresAt: invite.expiresAt, + status: invite.status, + redeemedByExternalUserId: invite.redeemedByExternalUserId, + redeemedByExternalChatId: invite.redeemedByExternalChatId, + redeemedAt: invite.redeemedAt, + expectedExternalUserId: invite.expectedExternalUserId, + voiceCodeHash: invite.voiceCodeHash, + voiceCodeDigits: invite.voiceCodeDigits, + inviteCodeHash: invite.inviteCodeHash, + friendName: invite.friendName, + guardianName: invite.guardianName, + contactId: invite.contactId, + createdAt: invite.createdAt, + updatedAt: invite.updatedAt, + }); + } catch (err) { + log.warn( + { err, inviteId: invite.id, contactId: invite.contactId }, + "createIngressInvite: gateway mirror dual-write failed (best-effort)", + ); + } +} + // --------------------------------------------------------------------------- // Response shapes — used by both HTTP routes and message handlers // --------------------------------------------------------------------------- @@ -232,6 +279,13 @@ export async function createIngressInvite(params: { : { inviteCodeHash }), }); + // Dual-write to the gateway's mirror table. Best-effort during Track B + // PR-B-1 — gateway DB is the future source of truth, assistant DB is the + // present-day source of truth, and the daemon owns invite creation today + // (LLM-generated guardian-instruction + channel-adapter resolution stay + // daemon-side for now). PR-B-2 will flip redemption gateway-native. + await mirrorInviteToGateway(invite); + // Build invite instruction for non-voice invites via LLM generation let guardianInstruction: string | undefined; let channelHandle: string | undefined; diff --git a/gateway/src/__tests__/invite-store.test.ts b/gateway/src/__tests__/invite-store.test.ts new file mode 100644 index 00000000000..ab18aa405ec --- /dev/null +++ b/gateway/src/__tests__/invite-store.test.ts @@ -0,0 +1,190 @@ +/** + * InviteStore.mirrorCreate — unit tests against the real bun-sqlite + * gateway DB. + * + * Track B PR-B-1: the daemon owns invite creation; the gateway holds a + * mirror row. These tests prove the mirror is idempotent on `id` (so a + * daemon retry after a transient mirror failure converges to the right + * state) and that voice + non-voice + invite-code shapes round-trip + * cleanly. + */ + +import { + describe, + test, + expect, + beforeAll, + beforeEach, + afterAll, +} from "bun:test"; + +import "./test-preload.js"; + +import { eq } from "drizzle-orm"; + +import { + initGatewayDb, + getGatewayDb, + resetGatewayDb, +} from "../db/connection.js"; +import { InviteStore } from "../db/invite-store.js"; +import { contacts, ingressInvites } from "../db/schema.js"; + +const CONTACT_ID = "co-invite-test"; + +function baseInvite(overrides: Partial = {}) { + const now = Date.now(); + return { + id: "inv-1", + sourceChannel: "telegram", + tokenHash: "tok-hash-1", + sourceConversationId: null, + note: null, + maxUses: 1, + useCount: 0, + expiresAt: now + 60_000, + status: "active" as const, + redeemedByExternalUserId: null, + redeemedByExternalChatId: null, + redeemedAt: null, + expectedExternalUserId: null, + voiceCodeHash: null, + voiceCodeDigits: null, + inviteCodeHash: null, + friendName: null, + guardianName: null, + contactId: CONTACT_ID, + createdAt: now, + updatedAt: now, + ...overrides, + }; +} + +beforeAll(async () => { + await initGatewayDb(); +}); + +beforeEach(() => { + const db = getGatewayDb(); + db.delete(ingressInvites).run(); + db.delete(contacts).run(); + const now = Date.now(); + db.insert(contacts) + .values({ + id: CONTACT_ID, + displayName: "Test Contact", + role: "contact", + principalId: null, + createdAt: now, + updatedAt: now, + }) + .run(); +}); + +afterAll(() => { + resetGatewayDb(); +}); + +describe("InviteStore.mirrorCreate", () => { + test("inserts a new mirror row when none exists", () => { + const store = new InviteStore(getGatewayDb()); + const row = store.mirrorCreate(baseInvite()); + + expect(row.id).toBe("inv-1"); + expect(row.sourceChannel).toBe("telegram"); + expect(row.tokenHash).toBe("tok-hash-1"); + expect(row.contactId).toBe(CONTACT_ID); + expect(row.maxUses).toBe(1); + expect(row.useCount).toBe(0); + expect(row.status).toBe("active"); + }); + + test("is idempotent on id — second call updates existing row", () => { + const store = new InviteStore(getGatewayDb()); + store.mirrorCreate(baseInvite({ useCount: 0 })); + + const updated = store.mirrorCreate( + baseInvite({ + useCount: 1, + status: "redeemed", + redeemedByExternalUserId: "ext-user-99", + redeemedAt: 12345, + }), + ); + + expect(updated.useCount).toBe(1); + expect(updated.status).toBe("redeemed"); + expect(updated.redeemedByExternalUserId).toBe("ext-user-99"); + expect(updated.redeemedAt).toBe(12345); + + const all = getGatewayDb() + .select() + .from(ingressInvites) + .where(eq(ingressInvites.id, "inv-1")) + .all(); + expect(all).toHaveLength(1); + }); + + test("round-trips a voice-invite shape", () => { + const store = new InviteStore(getGatewayDb()); + const row = store.mirrorCreate( + baseInvite({ + id: "inv-voice", + sourceChannel: "phone", + expectedExternalUserId: "+15551234567", + voiceCodeHash: "voice-h", + voiceCodeDigits: 6, + friendName: "Alice", + guardianName: "Bob", + }), + ); + + expect(row.sourceChannel).toBe("phone"); + expect(row.expectedExternalUserId).toBe("+15551234567"); + expect(row.voiceCodeHash).toBe("voice-h"); + expect(row.voiceCodeDigits).toBe(6); + expect(row.friendName).toBe("Alice"); + expect(row.guardianName).toBe("Bob"); + expect(row.inviteCodeHash).toBeNull(); + }); + + test("round-trips a non-voice invite-code shape", () => { + const store = new InviteStore(getGatewayDb()); + const row = store.mirrorCreate( + baseInvite({ + id: "inv-code", + sourceChannel: "slack", + inviteCodeHash: "code-h", + }), + ); + + expect(row.sourceChannel).toBe("slack"); + expect(row.inviteCodeHash).toBe("code-h"); + expect(row.voiceCodeHash).toBeNull(); + expect(row.expectedExternalUserId).toBeNull(); + }); + + test("getInvite returns the stored row by id, or undefined", () => { + const store = new InviteStore(getGatewayDb()); + store.mirrorCreate(baseInvite({ id: "inv-fetch" })); + + const found = store.getInvite("inv-fetch"); + expect(found?.id).toBe("inv-fetch"); + + expect(store.getInvite("nonexistent")).toBeUndefined(); + }); + + test("uses the injected db instance instead of the global", () => { + const db = getGatewayDb(); + const store = new InviteStore(db); + store.mirrorCreate(baseInvite({ id: "inv-inject" })); + + // Direct query through the injected db sees the same row. + const row = db + .select() + .from(ingressInvites) + .where(eq(ingressInvites.id, "inv-inject")) + .get(); + expect(row?.id).toBe("inv-inject"); + }); +}); diff --git a/gateway/src/__tests__/ipc-invite-routes.test.ts b/gateway/src/__tests__/ipc-invite-routes.test.ts new file mode 100644 index 00000000000..16cbda60b61 --- /dev/null +++ b/gateway/src/__tests__/ipc-invite-routes.test.ts @@ -0,0 +1,202 @@ +/** + * IPC route `mirror_invite_create` — integration test against the real + * GatewayIpcServer + bun-sqlite gateway DB. + * + * Verifies: + * - Happy path inserts a mirror row. + * - Repeated call with same id is idempotent (gateway → status update). + * - Zod-validation rejects malformed params before the store is touched. + * - Foreign-key violation (unknown contactId) surfaces as an error so + * daemon-side log.warn captures it. + */ + +import { + describe, + test, + expect, + beforeAll, + beforeEach, + afterAll, + afterEach, +} from "bun:test"; +import { existsSync, rmSync } from "node:fs"; +import { join } from "node:path"; +import { randomBytes } from "node:crypto"; +import { createConnection, type Socket } from "node:net"; + +import { testWorkspaceDir } from "./test-preload.js"; + +import { + initGatewayDb, + getGatewayDb, + resetGatewayDb, +} from "../db/connection.js"; +import { contacts, ingressInvites } from "../db/schema.js"; +import { GatewayIpcServer } from "../ipc/server.js"; +import { + _resetInviteStoreForTests, + inviteRoutes, +} from "../ipc/invite-handlers.js"; + +const socketPath = join(testWorkspaceDir, "gateway-invite.sock"); +const CONTACT_ID = "co-invite-ipc"; + +function connectClient(path: string): Promise { + return new Promise((resolve, reject) => { + const client = createConnection(path, () => resolve(client)); + client.on("error", reject); + }); +} + +function sendRequest( + client: Socket, + method: string, + params?: Record, +): Promise<{ id: string; result?: unknown; error?: string }> { + return new Promise((resolve, reject) => { + const id = randomBytes(4).toString("hex"); + let buffer = ""; + const onData = (chunk: Buffer) => { + buffer += chunk.toString(); + const newlineIdx = buffer.indexOf("\n"); + if (newlineIdx !== -1) { + const line = buffer.slice(0, newlineIdx).trim(); + buffer = buffer.slice(newlineIdx + 1); + client.off("data", onData); + try { + resolve(JSON.parse(line)); + } catch (err) { + reject(err); + } + } + }; + client.on("data", onData); + const msg = JSON.stringify({ id, method, params }); + client.write(msg + "\n"); + }); +} + +function basePayload(overrides: Record = {}) { + const now = Date.now(); + return { + id: "inv-ipc-1", + sourceChannel: "telegram", + tokenHash: "tok-h", + maxUses: 1, + useCount: 0, + expiresAt: now + 60_000, + status: "active", + contactId: CONTACT_ID, + createdAt: now, + updatedAt: now, + ...overrides, + }; +} + +beforeAll(async () => { + await initGatewayDb(); +}); + +beforeEach(() => { + const db = getGatewayDb(); + db.delete(ingressInvites).run(); + db.delete(contacts).run(); + const now = Date.now(); + db.insert(contacts) + .values({ + id: CONTACT_ID, + displayName: "Test Contact", + role: "contact", + principalId: null, + createdAt: now, + updatedAt: now, + }) + .run(); + _resetInviteStoreForTests(); +}); + +afterAll(() => { + resetGatewayDb(); +}); + +describe("IPC mirror_invite_create", () => { + let server: InstanceType; + let client: Socket; + + beforeEach(() => { + if (existsSync(socketPath)) rmSync(socketPath); + }); + + afterEach(() => { + client?.destroy(); + server?.stop(); + }); + + async function startServerAndConnect(): Promise { + server = new GatewayIpcServer([...inviteRoutes]); + (server as unknown as { socketPath: string }).socketPath = socketPath; + server.start(); + await new Promise((resolve) => setTimeout(resolve, 50)); + client = await connectClient(socketPath); + } + + test("writes a new mirror row via IPC", async () => { + await startServerAndConnect(); + const res = await sendRequest(client, "mirror_invite_create", basePayload()); + + expect(res.error).toBeUndefined(); + expect(res.result).toEqual({ id: "inv-ipc-1" }); + + const row = getGatewayDb() + .select() + .from(ingressInvites) + .all() + .find((r) => r.id === "inv-ipc-1"); + expect(row).toBeDefined(); + expect(row!.sourceChannel).toBe("telegram"); + expect(row!.tokenHash).toBe("tok-h"); + expect(row!.contactId).toBe(CONTACT_ID); + }); + + test("is idempotent on id (second call updates, no duplicate)", async () => { + await startServerAndConnect(); + await sendRequest(client, "mirror_invite_create", basePayload()); + await sendRequest( + client, + "mirror_invite_create", + basePayload({ useCount: 1, status: "redeemed" }), + ); + + const rows = getGatewayDb().select().from(ingressInvites).all(); + expect(rows).toHaveLength(1); + expect(rows[0]!.useCount).toBe(1); + expect(rows[0]!.status).toBe("redeemed"); + }); + + test("zod validation rejects malformed params", async () => { + await startServerAndConnect(); + + // Missing required fields (e.g. no contactId, no expiresAt). + const res = await sendRequest(client, "mirror_invite_create", { + id: "inv-bad", + }); + + expect(res.error).toBeDefined(); + // Nothing should have been written. + const rows = getGatewayDb().select().from(ingressInvites).all(); + expect(rows).toHaveLength(0); + }); + + test("foreign-key violation surfaces as an error (unknown contactId)", async () => { + await startServerAndConnect(); + const res = await sendRequest( + client, + "mirror_invite_create", + basePayload({ contactId: "no-such-contact" }), + ); + + expect(res.error).toBeDefined(); + const rows = getGatewayDb().select().from(ingressInvites).all(); + expect(rows).toHaveLength(0); + }); +}); diff --git a/gateway/src/db/invite-store.ts b/gateway/src/db/invite-store.ts new file mode 100644 index 00000000000..1c19af7aeaf --- /dev/null +++ b/gateway/src/db/invite-store.ts @@ -0,0 +1,94 @@ +/** + * Gateway-side invite store. + * + * The gateway's `ingress_invites` table is a **mirror** of the daemon's + * `assistant_ingress_invites` table during the gateway-security-migration + * Track B handoff. Today's writers: + * + * - Daemon `createInvite()` → IPC `mirror_invite_create` → this store + * + * Once redemption goes gateway-native (Track B PR-B-2) this store will also + * own use-count bumps and status flips, and will dual-write back to the + * daemon (matching Track A's pattern). + * + * Failure policy: every write is best-effort from the caller's perspective. + * This store throws on DB errors; the IPC handler logs and continues so the + * daemon-side caller can ignore mirror failures without rolling back the + * authoritative write. + */ + +import { eq } from "drizzle-orm"; + +import { type GatewayDb, getGatewayDb } from "./connection.js"; +import { ingressInvites } from "./schema.js"; + +export type IngressInvite = typeof ingressInvites.$inferSelect; + +export class InviteStore { + private injectedDb?: GatewayDb; + + constructor(db?: GatewayDb) { + this.injectedDb = db; + } + + private get db(): GatewayDb { + return this.injectedDb ?? getGatewayDb(); + } + + /** + * Insert (or replace) a mirror row for an invite that was just created + * authoritatively on the daemon. Idempotent on `id` so a daemon retry + * after a transient gateway failure converges to the right state. + */ + mirrorCreate(invite: typeof ingressInvites.$inferInsert): IngressInvite { + const existing = this.db + .select() + .from(ingressInvites) + .where(eq(ingressInvites.id, invite.id)) + .get(); + + if (existing) { + this.db + .update(ingressInvites) + .set({ + sourceChannel: invite.sourceChannel, + tokenHash: invite.tokenHash, + sourceConversationId: invite.sourceConversationId ?? null, + note: invite.note ?? null, + maxUses: invite.maxUses ?? 1, + useCount: invite.useCount ?? 0, + expiresAt: invite.expiresAt, + status: invite.status ?? "active", + redeemedByExternalUserId: invite.redeemedByExternalUserId ?? null, + redeemedByExternalChatId: invite.redeemedByExternalChatId ?? null, + redeemedAt: invite.redeemedAt ?? null, + expectedExternalUserId: invite.expectedExternalUserId ?? null, + voiceCodeHash: invite.voiceCodeHash ?? null, + voiceCodeDigits: invite.voiceCodeDigits ?? null, + inviteCodeHash: invite.inviteCodeHash ?? null, + friendName: invite.friendName ?? null, + guardianName: invite.guardianName ?? null, + contactId: invite.contactId, + updatedAt: invite.updatedAt, + }) + .where(eq(ingressInvites.id, invite.id)) + .run(); + } else { + this.db.insert(ingressInvites).values(invite).run(); + } + + return this.db + .select() + .from(ingressInvites) + .where(eq(ingressInvites.id, invite.id)) + .get()!; + } + + getInvite(id: string): IngressInvite | undefined { + return this.db + .select() + .from(ingressInvites) + .where(eq(ingressInvites.id, id)) + .get(); + } +} diff --git a/gateway/src/db/schema.ts b/gateway/src/db/schema.ts index 380ce80e4d1..488f0cd8385 100644 --- a/gateway/src/db/schema.ts +++ b/gateway/src/db/schema.ts @@ -126,7 +126,11 @@ export const ingressInvites = sqliteTable( { id: text("id").primaryKey(), sourceChannel: text("source_channel").notNull(), - inviteCodeHash: text("invite_code_hash").notNull(), + // Long-form invite token hash (primary lookup for token-based redemption). + tokenHash: text("token_hash").notNull(), + // Origin conversation, for backreference. NULL for invites not created + // from a conversation context. + sourceConversationId: text("source_conversation_id"), note: text("note"), maxUses: integer("max_uses").notNull().default(1), useCount: integer("use_count").notNull().default(0), @@ -135,6 +139,16 @@ export const ingressInvites = sqliteTable( redeemedByExternalUserId: text("redeemed_by_external_user_id"), redeemedByExternalChatId: text("redeemed_by_external_chat_id"), redeemedAt: integer("redeemed_at"), + // Voice invite fields (NULL for non-voice invites). + expectedExternalUserId: text("expected_external_user_id"), + voiceCodeHash: text("voice_code_hash"), + voiceCodeDigits: integer("voice_code_digits"), + // 6-digit invite code hash (NULL for voice invites, which use + // voiceCodeHash instead). + inviteCodeHash: text("invite_code_hash"), + // Display metadata for personalized voice prompts (NULL for non-voice). + friendName: text("friend_name"), + guardianName: text("guardian_name"), contactId: text("contact_id") .notNull() .references(() => contacts.id, { onDelete: "cascade" }), @@ -142,10 +156,15 @@ export const ingressInvites = sqliteTable( updatedAt: integer("updated_at").notNull(), }, (table) => [ + index("idx_ingress_invites_token_lookup").on(table.tokenHash), index("idx_ingress_invites_code_lookup").on( table.inviteCodeHash, table.sourceChannel, ), + index("idx_ingress_invites_voice_lookup").on( + table.voiceCodeHash, + table.sourceChannel, + ), index("idx_ingress_invites_contact").on(table.contactId), ], ); diff --git a/gateway/src/index.ts b/gateway/src/index.ts index 93cb107b1a1..8d3dd5fe238 100644 --- a/gateway/src/index.ts +++ b/gateway/src/index.ts @@ -171,6 +171,7 @@ import { import { GatewayIpcServer } from "./ipc/server.js"; import { contactRoutes } from "./ipc/contact-handlers.js"; import { featureFlagRoutes } from "./ipc/feature-flag-handlers.js"; +import { inviteRoutes } from "./ipc/invite-handlers.js"; import { thresholdRoutes } from "./ipc/threshold-handlers.js"; import { riskClassificationRoutes } from "./ipc/risk-classification-handlers.js"; @@ -2116,6 +2117,7 @@ async function main() { const ipcServer = new GatewayIpcServer([ ...featureFlagRoutes, ...contactRoutes, + ...inviteRoutes, ...thresholdRoutes, ...riskClassificationRoutes, ...createVelayRoutes(velayTunnelClient), diff --git a/gateway/src/ipc/invite-handlers.ts b/gateway/src/ipc/invite-handlers.ts new file mode 100644 index 00000000000..3d5f029040a --- /dev/null +++ b/gateway/src/ipc/invite-handlers.ts @@ -0,0 +1,109 @@ +/** + * IPC route definitions for invite mirroring. + * + * Direction: assistant daemon → gateway. The daemon owns invite creation + * during Track B PR-B-1: it generates the token / voice-code / invite-code, + * resolves channel adapters, and runs the LLM-driven guardian-instruction + * builder. After persisting to its own `assistant_ingress_invites` table, + * the daemon calls this handler to populate the gateway's mirror row. + * + * Failure policy: the handler logs and re-raises. The daemon-side call site + * is best-effort (`.catch(log.warn)`) so a mirror failure never breaks the + * authoritative write. + */ + +import { z } from "zod"; + +import { InviteStore } from "../db/invite-store.js"; +import { getLogger } from "../logger.js"; +import type { IpcRoute } from "./server.js"; + +const log = getLogger("invite-handlers"); + +let store: InviteStore | null = null; + +function getStore(): InviteStore { + if (!store) { + store = new InviteStore(); + } + return store; +} + +/** + * Reset the cached store. Tests inject their own DB by clearing the + * singleton between cases. + */ +export function _resetInviteStoreForTests(): void { + store = null; +} + +const MirrorInviteCreateParamsSchema = z.object({ + id: z.string().min(1), + sourceChannel: z.string().min(1), + tokenHash: z.string().min(1), + sourceConversationId: z.string().nullable().optional(), + note: z.string().nullable().optional(), + maxUses: z.number().int().min(1), + useCount: z.number().int().min(0), + expiresAt: z.number().int(), + status: z.enum(["active", "redeemed", "revoked", "expired"]), + redeemedByExternalUserId: z.string().nullable().optional(), + redeemedByExternalChatId: z.string().nullable().optional(), + redeemedAt: z.number().int().nullable().optional(), + expectedExternalUserId: z.string().nullable().optional(), + voiceCodeHash: z.string().nullable().optional(), + voiceCodeDigits: z.number().int().nullable().optional(), + inviteCodeHash: z.string().nullable().optional(), + friendName: z.string().nullable().optional(), + guardianName: z.string().nullable().optional(), + contactId: z.string().min(1), + createdAt: z.number().int(), + updatedAt: z.number().int(), +}); + +export const inviteRoutes: IpcRoute[] = [ + { + method: "mirror_invite_create", + schema: MirrorInviteCreateParamsSchema, + handler: (params?: Record) => { + const parsed = MirrorInviteCreateParamsSchema.parse(params); + try { + const row = getStore().mirrorCreate({ + ...parsed, + sourceConversationId: parsed.sourceConversationId ?? null, + note: parsed.note ?? null, + redeemedByExternalUserId: parsed.redeemedByExternalUserId ?? null, + redeemedByExternalChatId: parsed.redeemedByExternalChatId ?? null, + redeemedAt: parsed.redeemedAt ?? null, + expectedExternalUserId: parsed.expectedExternalUserId ?? null, + voiceCodeHash: parsed.voiceCodeHash ?? null, + voiceCodeDigits: parsed.voiceCodeDigits ?? null, + inviteCodeHash: parsed.inviteCodeHash ?? null, + friendName: parsed.friendName ?? null, + guardianName: parsed.guardianName ?? null, + }); + log.info( + { + inviteId: row.id, + sourceChannel: row.sourceChannel, + contactId: row.contactId, + status: row.status, + }, + "mirror_invite_create: gateway mirror row written", + ); + return { id: row.id }; + } catch (err) { + log.error( + { + err, + inviteId: parsed.id, + sourceChannel: parsed.sourceChannel, + contactId: parsed.contactId, + }, + "mirror_invite_create: gateway mirror write failed", + ); + throw err; + } + }, + }, +];