diff --git a/assistant/src/__tests__/db-proxy-transaction.test.ts b/assistant/src/__tests__/db-proxy-transaction.test.ts new file mode 100644 index 00000000000..7ced6d2fc9d --- /dev/null +++ b/assistant/src/__tests__/db-proxy-transaction.test.ts @@ -0,0 +1,180 @@ +/** + * Tests for the `db_proxy_transaction` IPC handler. + * + * Verifies all-or-nothing semantics: every step commits together, any + * exception or `requireChanges` violation rolls the entire batch back. + * + * Uses the real DB (via `initializeDb()`); the test preload points + * `VELLUM_WORKSPACE_DIR` at a per-file temp dir. + */ + +import { beforeEach, describe, expect, test } from "bun:test"; + +import { handleDbProxyTransaction } from "../ipc/routes/db-proxy-transaction.js"; +import { getSqlite } from "../memory/db-connection.js"; +import { initializeDb } from "../memory/db-init.js"; + +initializeDb(); + +function resetTestTable(): void { + const sqlite = getSqlite(); + sqlite.exec("DROP TABLE IF EXISTS proxy_tx_test"); + sqlite.exec( + "CREATE TABLE proxy_tx_test (id INTEGER PRIMARY KEY, label TEXT NOT NULL, count INTEGER NOT NULL DEFAULT 0)", + ); +} + +function rowCount(): number { + const result = getSqlite() + .prepare("SELECT COUNT(*) AS n FROM proxy_tx_test") + .get() as { n: number }; + return result.n; +} + +describe("db_proxy_transaction", () => { + beforeEach(() => { + resetTestTable(); + }); + + test("commits multiple inserts atomically", () => { + const result = handleDbProxyTransaction({ + steps: [ + { + sql: "INSERT INTO proxy_tx_test (id, label) VALUES (?, ?)", + bind: [1, "alpha"], + }, + { + sql: "INSERT INTO proxy_tx_test (id, label) VALUES (?, ?)", + bind: [2, "beta"], + }, + ], + }); + + expect(result.ok).toBe(true); + if (result.ok) { + expect(result.results).toHaveLength(2); + expect(result.results[0].changes).toBe(1); + expect(result.results[1].changes).toBe(1); + } + expect(rowCount()).toBe(2); + }); + + test("rolls back all writes when a later step throws (SQL constraint)", () => { + // Pre-existing row that the transaction will collide with. + getSqlite() + .prepare("INSERT INTO proxy_tx_test (id, label) VALUES (?, ?)") + .run(2, "preexisting"); + + expect(() => + handleDbProxyTransaction({ + steps: [ + { + sql: "INSERT INTO proxy_tx_test (id, label) VALUES (?, ?)", + bind: [1, "alpha"], + }, + // Primary-key collision triggers a SqliteError mid-transaction. + { + sql: "INSERT INTO proxy_tx_test (id, label) VALUES (?, ?)", + bind: [2, "beta"], + }, + ], + }), + ).toThrow(); + + // The first step's insert must NOT have committed. + expect(rowCount()).toBe(1); + const remaining = getSqlite() + .prepare("SELECT label FROM proxy_tx_test WHERE id = ?") + .get(2) as { label: string }; + expect(remaining.label).toBe("preexisting"); + }); + + test("requireChanges aborts the transaction when unmet", () => { + // Seed a row to update; condition will not match so changes = 0. + getSqlite() + .prepare("INSERT INTO proxy_tx_test (id, label, count) VALUES (?, ?, ?)") + .run(1, "active", 0); + + const result = handleDbProxyTransaction({ + steps: [ + { + sql: "INSERT INTO proxy_tx_test (id, label) VALUES (?, ?)", + bind: [99, "should-rollback"], + }, + { + // No row matches label='nonexistent', so changes = 0. + sql: "UPDATE proxy_tx_test SET count = count + 1 WHERE label = ?", + bind: ["nonexistent"], + requireChanges: 1, + }, + ], + }); + + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.reason).toBe("require_changes_failed"); + expect(result.failedStep).toBe(1); + expect(result.actualChanges).toBe(0); + expect(result.requiredChanges).toBe(1); + } + + // The earlier insert must have rolled back. + expect(rowCount()).toBe(1); + const remaining = getSqlite() + .prepare("SELECT label FROM proxy_tx_test WHERE id = ?") + .get(1) as { label: string }; + expect(remaining.label).toBe("active"); + }); + + test("requireChanges allows the transaction to commit when met", () => { + getSqlite() + .prepare("INSERT INTO proxy_tx_test (id, label, count) VALUES (?, ?, ?)") + .run(1, "active", 0); + + const result = handleDbProxyTransaction({ + steps: [ + { + sql: "INSERT INTO proxy_tx_test (id, label) VALUES (?, ?)", + bind: [2, "new"], + }, + { + sql: "UPDATE proxy_tx_test SET count = count + 1 WHERE label = ?", + bind: ["active"], + requireChanges: 1, + }, + ], + }); + + expect(result.ok).toBe(true); + if (result.ok) { + expect(result.results[1].changes).toBe(1); + } + expect(rowCount()).toBe(2); + const updated = getSqlite() + .prepare("SELECT count FROM proxy_tx_test WHERE id = ?") + .get(1) as { count: number }; + expect(updated.count).toBe(1); + }); + + test("rejects empty step list", () => { + expect(() => handleDbProxyTransaction({ steps: [] })).toThrow( + /at least one step/, + ); + }); + + test("returns lastInsertRowid for inserts", () => { + const result = handleDbProxyTransaction({ + steps: [ + { + sql: "INSERT INTO proxy_tx_test (label) VALUES (?)", + bind: ["solo"], + }, + ], + }); + + expect(result.ok).toBe(true); + if (result.ok) { + expect(result.results[0].lastInsertRowid).toBeGreaterThan(0); + } + }); +}); diff --git a/assistant/src/ipc/assistant-server.ts b/assistant/src/ipc/assistant-server.ts index b641a337a30..24d4c4b134c 100644 --- a/assistant/src/ipc/assistant-server.ts +++ b/assistant/src/ipc/assistant-server.ts @@ -53,6 +53,10 @@ import { writeStreamEnd, } from "./ipc-framing.js"; import { type DbProxyParams, handleDbProxy } from "./routes/db-proxy.js"; +import { + type DbProxyTransactionParams, + handleDbProxyTransaction, +} from "./routes/db-proxy-transaction.js"; import { routeDefinitionsToIpcMethods } from "./routes/route-adapter.js"; import { ensureSocketPathFree } from "./socket-cleanup.js"; import { resolveIpcSocketPath } from "./socket-path.js"; @@ -167,13 +171,17 @@ export class AssistantIpcServer { this.methods.set(route.operationId, route.handler); } - // ⚠️ TEMPORARY — gateway→assistant DB proxy (see ipc/routes/db-proxy.ts). - // This is the ONLY route defined directly here; all other routes go in ROUTES. - // Remove once contacts/guardian-binding logic is fully migrated to the - // gateway's own database. + // ⚠️ TEMPORARY — gateway→assistant DB proxies (see ipc/routes/db-proxy.ts + // and ipc/routes/db-proxy-transaction.ts). These are the ONLY routes + // defined directly here; all other routes go in ROUTES. Remove once + // contacts/guardian-binding logic is fully migrated to the gateway's + // own database. this.methods.set("db_proxy", (params) => handleDbProxy(params as unknown as DbProxyParams), ); + this.methods.set("db_proxy_transaction", (params) => + handleDbProxyTransaction(params as unknown as DbProxyTransactionParams), + ); this.watchdog = new SocketWatchdog({ socketPath: this.socketPath, diff --git a/assistant/src/ipc/routes/db-proxy-transaction.ts b/assistant/src/ipc/routes/db-proxy-transaction.ts new file mode 100644 index 00000000000..2b7ace2fd6f --- /dev/null +++ b/assistant/src/ipc/routes/db-proxy-transaction.ts @@ -0,0 +1,139 @@ +/** + * ⚠️ TEMPORARY HACK — DO NOT EXTEND ⚠️ + * + * IPC route that lets the gateway execute multiple write statements against + * the assistant's SQLite database inside a single atomic transaction. + * + * Companion to db-proxy.ts. Exists because some gateway-orchestrated writes + * (e.g. invite redemption: upsert contact channel + record invite use) must + * be all-or-nothing. With the contacts/guardian/invite tables still living + * in the assistant DB, the gateway needs a way to commit several writes + * atomically there. + * + * Each step is a write (INSERT/UPDATE/DELETE). All steps run inside a + * BEGIN IMMEDIATE transaction. If any step throws — including a step whose + * `requireChanges` constraint is unmet — the entire transaction rolls back. + * + * Read-modify-write across steps is not supported (the IPC is one-shot; + * later steps cannot react to earlier step results except via SQL conditions + * embedded in the WHERE clause and the optional `requireChanges` guard). + * + * This route is intentionally NOT in the shared ROUTES array — it is a + * private implementation detail between the gateway and assistant IPC + * servers and must not be discoverable by clients or the OpenAPI spec. + * + * Remove once contacts/guardian/invite logic is fully migrated to the + * gateway's own database. + */ + +import { getSqlite } from "../../memory/db-connection.js"; +import { getLogger } from "../../util/logger.js"; + +const log = getLogger("db-proxy-transaction"); + +/** Column value types that SQLite can return. */ +type SqliteValue = string | number | null | Uint8Array; + +export interface DbProxyTransactionStep { + /** The SQL write statement to execute. */ + sql: string; + /** Positional bind parameters. */ + bind?: SqliteValue[]; + /** + * If set, abort the transaction (rollback) when this step's row-change + * count is below this threshold. Used for stale-write detection — e.g. + * "increment use_count only if status = 'active' AND use_count < max_uses", + * with `requireChanges: 1` to abort when no rows match. + */ + requireChanges?: number; +} + +export interface DbProxyTransactionParams { + steps: DbProxyTransactionStep[]; +} + +export type DbProxyTransactionResult = + | { + ok: true; + results: Array<{ changes: number; lastInsertRowid: number }>; + } + | { + ok: false; + reason: "require_changes_failed"; + failedStep: number; + actualChanges: number; + requiredChanges: number; + }; + +export function handleDbProxyTransaction( + params: DbProxyTransactionParams, +): DbProxyTransactionResult { + const db = getSqlite(); + + if (!Array.isArray(params.steps) || params.steps.length === 0) { + throw new Error("db_proxy_transaction requires at least one step"); + } + + // Sentinel used to abort the transaction without leaking through as a generic + // SQL error. Better-sqlite3 rolls back when the inner function throws. + class RequireChangesFailure extends Error { + constructor( + public failedStep: number, + public actualChanges: number, + public requiredChanges: number, + ) { + super( + `Step ${failedStep} affected ${actualChanges} rows, requires ${requiredChanges}`, + ); + } + } + + const results: Array<{ changes: number; lastInsertRowid: number }> = []; + + try { + db.transaction(() => { + for (let i = 0; i < params.steps.length; i++) { + const step = params.steps[i]; + const stmt = db.prepare(step.sql); + const result = step.bind ? stmt.run(...step.bind) : stmt.run(); + const changes = result.changes; + results.push({ + changes, + lastInsertRowid: Number(result.lastInsertRowid), + }); + + if ( + step.requireChanges !== undefined && + changes < step.requireChanges + ) { + throw new RequireChangesFailure(i, changes, step.requireChanges); + } + } + }).immediate(); + } catch (err) { + if (err instanceof RequireChangesFailure) { + log.debug( + { + failedStep: err.failedStep, + actualChanges: err.actualChanges, + requiredChanges: err.requiredChanges, + }, + "db-proxy-transaction aborted by requireChanges guard", + ); + return { + ok: false, + reason: "require_changes_failed", + failedStep: err.failedStep, + actualChanges: err.actualChanges, + requiredChanges: err.requiredChanges, + }; + } + throw err; + } + + log.debug( + { stepCount: params.steps.length }, + "db-proxy-transaction committed", + ); + return { ok: true, results }; +} diff --git a/gateway/src/db/assistant-db-proxy.ts b/gateway/src/db/assistant-db-proxy.ts index 852d937d96c..8f5aa331214 100644 --- a/gateway/src/db/assistant-db-proxy.ts +++ b/gateway/src/db/assistant-db-proxy.ts @@ -66,3 +66,58 @@ export async function assistantDbRun( export async function assistantDbExec(sql: string): Promise { await dbProxy(sql, "exec"); } + +// --------------------------------------------------------------------------- +// Transaction helper +// --------------------------------------------------------------------------- + +export interface AssistantDbTransactionStep { + /** The SQL write statement to execute. */ + sql: string; + /** Positional bind parameters. */ + bind?: SqliteValue[]; + /** + * If set, abort the transaction (rollback) when this step's row-change + * count is below this threshold. Used for stale-write detection — e.g. + * "increment use_count only if status = 'active' AND use_count < max_uses", + * with `requireChanges: 1` to abort when no rows match. + */ + requireChanges?: number; +} + +export type AssistantDbTransactionResult = + | { + ok: true; + results: Array<{ changes: number; lastInsertRowid: number }>; + } + | { + ok: false; + reason: "require_changes_failed"; + failedStep: number; + actualChanges: number; + requiredChanges: number; + }; + +/** + * Execute multiple write statements against the assistant's SQLite DB inside + * a single atomic transaction (BEGIN IMMEDIATE). All steps commit together; + * any throw — including a `requireChanges` constraint failure — rolls back + * the entire batch. + * + * Use this when several writes must succeed or fail as a unit (e.g. invite + * redemption: contact-channel upsert + invite-use record). + * + * Read-modify-write across steps is not supported. Use SQL-level conditions + * (WHERE clauses, ON CONFLICT) plus `requireChanges` for stale-write detection. + */ +export async function assistantDbTransaction( + steps: AssistantDbTransactionStep[], +): Promise { + const result = await ipcCallAssistant("db_proxy_transaction", { steps }); + if (result === undefined) { + throw new Error( + "db_proxy_transaction IPC call failed — assistant may not be ready", + ); + } + return result as AssistantDbTransactionResult; +}