Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 180 additions & 0 deletions assistant/src/__tests__/db-proxy-transaction.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/**
* Tests for the `db_proxy_transaction` IPC handler.
*
* Verifies all-or-nothing semantics: every step commits together, any
* exception or `requireChanges` violation rolls the entire batch back.
*
* Uses the real DB (via `initializeDb()`); the test preload points
* `VELLUM_WORKSPACE_DIR` at a per-file temp dir.
*/

import { beforeEach, describe, expect, test } from "bun:test";

import { handleDbProxyTransaction } from "../ipc/routes/db-proxy-transaction.js";
import { getSqlite } from "../memory/db-connection.js";
import { initializeDb } from "../memory/db-init.js";

initializeDb();

function resetTestTable(): void {
const sqlite = getSqlite();
sqlite.exec("DROP TABLE IF EXISTS proxy_tx_test");
sqlite.exec(
"CREATE TABLE proxy_tx_test (id INTEGER PRIMARY KEY, label TEXT NOT NULL, count INTEGER NOT NULL DEFAULT 0)",
);
}

function rowCount(): number {
const result = getSqlite()
.prepare("SELECT COUNT(*) AS n FROM proxy_tx_test")
.get() as { n: number };
return result.n;
}

describe("db_proxy_transaction", () => {
beforeEach(() => {
resetTestTable();
});

test("commits multiple inserts atomically", () => {
const result = handleDbProxyTransaction({
steps: [
{
sql: "INSERT INTO proxy_tx_test (id, label) VALUES (?, ?)",
bind: [1, "alpha"],
},
{
sql: "INSERT INTO proxy_tx_test (id, label) VALUES (?, ?)",
bind: [2, "beta"],
},
],
});

expect(result.ok).toBe(true);
if (result.ok) {
expect(result.results).toHaveLength(2);
expect(result.results[0].changes).toBe(1);
expect(result.results[1].changes).toBe(1);
}
expect(rowCount()).toBe(2);
});

test("rolls back all writes when a later step throws (SQL constraint)", () => {
// Pre-existing row that the transaction will collide with.
getSqlite()
.prepare("INSERT INTO proxy_tx_test (id, label) VALUES (?, ?)")
.run(2, "preexisting");

expect(() =>
handleDbProxyTransaction({
steps: [
{
sql: "INSERT INTO proxy_tx_test (id, label) VALUES (?, ?)",
bind: [1, "alpha"],
},
// Primary-key collision triggers a SqliteError mid-transaction.
{
sql: "INSERT INTO proxy_tx_test (id, label) VALUES (?, ?)",
bind: [2, "beta"],
},
],
}),
).toThrow();

// The first step's insert must NOT have committed.
expect(rowCount()).toBe(1);
const remaining = getSqlite()
.prepare("SELECT label FROM proxy_tx_test WHERE id = ?")
.get(2) as { label: string };
expect(remaining.label).toBe("preexisting");
});

test("requireChanges aborts the transaction when unmet", () => {
// Seed a row to update; condition will not match so changes = 0.
getSqlite()
.prepare("INSERT INTO proxy_tx_test (id, label, count) VALUES (?, ?, ?)")
.run(1, "active", 0);

const result = handleDbProxyTransaction({
steps: [
{
sql: "INSERT INTO proxy_tx_test (id, label) VALUES (?, ?)",
bind: [99, "should-rollback"],
},
{
// No row matches label='nonexistent', so changes = 0.
sql: "UPDATE proxy_tx_test SET count = count + 1 WHERE label = ?",
bind: ["nonexistent"],
requireChanges: 1,
},
],
});

expect(result.ok).toBe(false);
if (!result.ok) {
expect(result.reason).toBe("require_changes_failed");
expect(result.failedStep).toBe(1);
expect(result.actualChanges).toBe(0);
expect(result.requiredChanges).toBe(1);
}

// The earlier insert must have rolled back.
expect(rowCount()).toBe(1);
const remaining = getSqlite()
.prepare("SELECT label FROM proxy_tx_test WHERE id = ?")
.get(1) as { label: string };
expect(remaining.label).toBe("active");
});

test("requireChanges allows the transaction to commit when met", () => {
getSqlite()
.prepare("INSERT INTO proxy_tx_test (id, label, count) VALUES (?, ?, ?)")
.run(1, "active", 0);

const result = handleDbProxyTransaction({
steps: [
{
sql: "INSERT INTO proxy_tx_test (id, label) VALUES (?, ?)",
bind: [2, "new"],
},
{
sql: "UPDATE proxy_tx_test SET count = count + 1 WHERE label = ?",
bind: ["active"],
requireChanges: 1,
},
],
});

expect(result.ok).toBe(true);
if (result.ok) {
expect(result.results[1].changes).toBe(1);
}
expect(rowCount()).toBe(2);
const updated = getSqlite()
.prepare("SELECT count FROM proxy_tx_test WHERE id = ?")
.get(1) as { count: number };
expect(updated.count).toBe(1);
});

test("rejects empty step list", () => {
expect(() => handleDbProxyTransaction({ steps: [] })).toThrow(
/at least one step/,
);
});

test("returns lastInsertRowid for inserts", () => {
const result = handleDbProxyTransaction({
steps: [
{
sql: "INSERT INTO proxy_tx_test (label) VALUES (?)",
bind: ["solo"],
},
],
});

expect(result.ok).toBe(true);
if (result.ok) {
expect(result.results[0].lastInsertRowid).toBeGreaterThan(0);
}
});
});
16 changes: 12 additions & 4 deletions assistant/src/ipc/assistant-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ import {
writeStreamEnd,
} from "./ipc-framing.js";
import { type DbProxyParams, handleDbProxy } from "./routes/db-proxy.js";
import {
type DbProxyTransactionParams,
handleDbProxyTransaction,
} from "./routes/db-proxy-transaction.js";
import { routeDefinitionsToIpcMethods } from "./routes/route-adapter.js";
import { ensureSocketPathFree } from "./socket-cleanup.js";
import { resolveIpcSocketPath } from "./socket-path.js";
Expand Down Expand Up @@ -167,13 +171,17 @@ export class AssistantIpcServer {
this.methods.set(route.operationId, route.handler);
}

// ⚠️ TEMPORARY — gateway→assistant DB proxy (see ipc/routes/db-proxy.ts).
// This is the ONLY route defined directly here; all other routes go in ROUTES.
// Remove once contacts/guardian-binding logic is fully migrated to the
// gateway's own database.
// ⚠️ TEMPORARY — gateway→assistant DB proxies (see ipc/routes/db-proxy.ts
// and ipc/routes/db-proxy-transaction.ts). These are the ONLY routes
// defined directly here; all other routes go in ROUTES. Remove once
// contacts/guardian-binding logic is fully migrated to the gateway's
// own database.
this.methods.set("db_proxy", (params) =>
handleDbProxy(params as unknown as DbProxyParams),
);
this.methods.set("db_proxy_transaction", (params) =>
handleDbProxyTransaction(params as unknown as DbProxyTransactionParams),
);

this.watchdog = new SocketWatchdog({
socketPath: this.socketPath,
Expand Down
139 changes: 139 additions & 0 deletions assistant/src/ipc/routes/db-proxy-transaction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/**
* ⚠️ TEMPORARY HACK — DO NOT EXTEND ⚠️
*
* IPC route that lets the gateway execute multiple write statements against
* the assistant's SQLite database inside a single atomic transaction.
*
* Companion to db-proxy.ts. Exists because some gateway-orchestrated writes
* (e.g. invite redemption: upsert contact channel + record invite use) must
* be all-or-nothing. With the contacts/guardian/invite tables still living
* in the assistant DB, the gateway needs a way to commit several writes
* atomically there.
*
* Each step is a write (INSERT/UPDATE/DELETE). All steps run inside a
* BEGIN IMMEDIATE transaction. If any step throws — including a step whose
* `requireChanges` constraint is unmet — the entire transaction rolls back.
*
* Read-modify-write across steps is not supported (the IPC is one-shot;
* later steps cannot react to earlier step results except via SQL conditions
* embedded in the WHERE clause and the optional `requireChanges` guard).
*
* This route is intentionally NOT in the shared ROUTES array — it is a
* private implementation detail between the gateway and assistant IPC
* servers and must not be discoverable by clients or the OpenAPI spec.
*
* Remove once contacts/guardian/invite logic is fully migrated to the
* gateway's own database.
*/

import { getSqlite } from "../../memory/db-connection.js";
import { getLogger } from "../../util/logger.js";

const log = getLogger("db-proxy-transaction");

/** Column value types that SQLite can return. */
type SqliteValue = string | number | null | Uint8Array;

export interface DbProxyTransactionStep {
/** The SQL write statement to execute. */
sql: string;
/** Positional bind parameters. */
bind?: SqliteValue[];
/**
* If set, abort the transaction (rollback) when this step's row-change
* count is below this threshold. Used for stale-write detection — e.g.
* "increment use_count only if status = 'active' AND use_count < max_uses",
* with `requireChanges: 1` to abort when no rows match.
*/
requireChanges?: number;
}

export interface DbProxyTransactionParams {
steps: DbProxyTransactionStep[];
}

export type DbProxyTransactionResult =
| {
ok: true;
results: Array<{ changes: number; lastInsertRowid: number }>;
}
| {
ok: false;
reason: "require_changes_failed";
failedStep: number;
actualChanges: number;
requiredChanges: number;
};

export function handleDbProxyTransaction(
params: DbProxyTransactionParams,
): DbProxyTransactionResult {
const db = getSqlite();

if (!Array.isArray(params.steps) || params.steps.length === 0) {
throw new Error("db_proxy_transaction requires at least one step");
}

// Sentinel used to abort the transaction without leaking through as a generic
// SQL error. Better-sqlite3 rolls back when the inner function throws.
class RequireChangesFailure extends Error {
constructor(
public failedStep: number,
public actualChanges: number,
public requiredChanges: number,
) {
super(
`Step ${failedStep} affected ${actualChanges} rows, requires ${requiredChanges}`,
);
}
}

const results: Array<{ changes: number; lastInsertRowid: number }> = [];

try {
db.transaction(() => {
for (let i = 0; i < params.steps.length; i++) {
const step = params.steps[i];
const stmt = db.prepare(step.sql);
const result = step.bind ? stmt.run(...step.bind) : stmt.run();
const changes = result.changes;
results.push({
changes,
lastInsertRowid: Number(result.lastInsertRowid),
});

if (
step.requireChanges !== undefined &&
changes < step.requireChanges
) {
throw new RequireChangesFailure(i, changes, step.requireChanges);
}
}
}).immediate();
} catch (err) {
if (err instanceof RequireChangesFailure) {
log.debug(
{
failedStep: err.failedStep,
actualChanges: err.actualChanges,
requiredChanges: err.requiredChanges,
},
"db-proxy-transaction aborted by requireChanges guard",
);
return {
ok: false,
reason: "require_changes_failed",
failedStep: err.failedStep,
actualChanges: err.actualChanges,
requiredChanges: err.requiredChanges,
};
}
throw err;
}

log.debug(
{ stepCount: params.steps.length },
"db-proxy-transaction committed",
);
return { ok: true, results };
}
Loading
Loading