Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 33 additions & 7 deletions assistant/src/__tests__/db-proxy-transaction.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { beforeEach, describe, expect, test } from "bun:test";
import { handleDbProxyTransaction } from "../ipc/routes/db-proxy-transaction.js";
import { getSqlite } from "../memory/db-connection.js";
import { initializeDb } from "../memory/db-init.js";
import { RouteError } from "../runtime/routes/errors.js";

initializeDb();

Expand Down Expand Up @@ -65,7 +66,8 @@ describe("db_proxy_transaction", () => {
.prepare("INSERT INTO proxy_tx_test (id, label) VALUES (?, ?)")
.run(2, "preexisting");

expect(() =>
let caught: unknown;
try {
handleDbProxyTransaction({
steps: [
{
Expand All @@ -78,8 +80,23 @@ describe("db_proxy_transaction", () => {
bind: [2, "beta"],
},
],
}),
).toThrow();
});
} catch (err) {
caught = err;
}

// The thrown error must be a RouteError carrying the underlying SQL
// message — without the wrapping, the IPC envelope would lose the
// statusCode and the gateway-side strict caller would misclassify
// this as a transport failure ("assistant may not be ready").
expect(caught).toBeInstanceOf(RouteError);
if (caught instanceof RouteError) {
expect(caught.code).toBe("DB_PROXY_TRANSACTION_FAILED");
expect(caught.statusCode).toBe(500);
// The original SQL constraint message must survive the wrap so
// operators can debug from the gateway logs.
expect(caught.message).toMatch(/UNIQUE|PRIMARY KEY|constraint/i);
}

// The first step's insert must NOT have committed.
expect(rowCount()).toBe(1);
Expand Down Expand Up @@ -156,10 +173,19 @@ describe("db_proxy_transaction", () => {
expect(updated.count).toBe(1);
});

test("rejects empty step list", () => {
expect(() => handleDbProxyTransaction({ steps: [] })).toThrow(
/at least one step/,
);
test("rejects empty step list with a 400 RouteError", () => {
let caught: unknown;
try {
handleDbProxyTransaction({ steps: [] });
} catch (err) {
caught = err;
}
expect(caught).toBeInstanceOf(RouteError);
if (caught instanceof RouteError) {
expect(caught.statusCode).toBe(400);
expect(caught.code).toBe("INVALID_PARAMS");
expect(caught.message).toMatch(/at least one step/);
}
});

test("returns lastInsertRowid for inserts", () => {
Expand Down
16 changes: 14 additions & 2 deletions assistant/src/ipc/routes/db-proxy-transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
*/

import { getSqlite } from "../../memory/db-connection.js";
import { RouteError } from "../../runtime/routes/errors.js";
import { getLogger } from "../../util/logger.js";

const log = getLogger("db-proxy-transaction");
Expand Down Expand Up @@ -71,7 +72,11 @@ export function handleDbProxyTransaction(
const db = getSqlite();

if (!Array.isArray(params.steps) || params.steps.length === 0) {
throw new Error("db_proxy_transaction requires at least one step");
throw new RouteError(
"db_proxy_transaction requires at least one step",
"INVALID_PARAMS",
400,
);
}

// Sentinel used to abort the transaction without leaking through as a generic
Expand Down Expand Up @@ -128,7 +133,14 @@ export function handleDbProxyTransaction(
requiredChanges: err.requiredChanges,
};
}
throw err;
// Wrap raw SQL/runtime errors in RouteError so the IPC envelope carries
// a statusCode + errorCode. Without this, the gateway-side strict caller
// sees a structureless `msg.error` and misclassifies it as a transport
// failure ("assistant may not be ready"), masking the real SQL error
// and breaking debuggability + retry decisions.
const message = err instanceof Error ? err.message : String(err);
log.warn({ err }, "db-proxy-transaction execution failed");
throw new RouteError(message, "DB_PROXY_TRANSACTION_FAILED", 500);
}

log.debug(
Expand Down
10 changes: 5 additions & 5 deletions gateway/src/__tests__/contact-prompt-submit.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ initSigningKey(Buffer.from("test-signing-key-at-least-32-bytes-long-xx"));
let testAssistantDb: Database | null = null;

mock.module("../db/assistant-db-proxy.js", () => ({
// eslint-disable-next-line @typescript-eslint/no-explicit-any
async assistantDbQuery(sql: string, bind?: any[]) {
if (!testAssistantDb) throw new Error("test assistant DB not initialized");
const stmt = testAssistantDb.prepare(sql);
return bind ? stmt.all(...bind) : stmt.all();
},
// eslint-disable-next-line @typescript-eslint/no-explicit-any
async assistantDbRun(sql: string, bind?: any[]) {
if (!testAssistantDb) throw new Error("test assistant DB not initialized");
const stmt = testAssistantDb.prepare(sql);
Expand Down Expand Up @@ -187,7 +187,7 @@ describe("handleContactPromptSubmit", () => {

// IPC should have been called with the guardian contactId.
expect(ipcMock).toHaveBeenCalledTimes(1);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const ipcCall = (ipcMock.mock.calls as any[][])[0][1] as { body: Record<string, unknown> };
expect(ipcCall.body.contactId).toBe("guardian-1");
});
Expand Down Expand Up @@ -258,7 +258,7 @@ describe("handleContactPromptSubmit", () => {

// IPC should have been called with an error so the CLI doesn't hang.
expect(ipcMock).toHaveBeenCalledTimes(1);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const ipcCall = (ipcMock.mock.calls as any[][])[0][1] as { body: Record<string, unknown> };
expect(typeof ipcCall.body.error).toBe("string");
});
Expand Down Expand Up @@ -315,7 +315,7 @@ describe("handleContactPromptSubmit", () => {
expect(contacts).toHaveLength(1);
expect(contacts[0].id).toBe("contact-1");

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const ipcCall = (ipcMock.mock.calls as any[][])[0][1] as { body: Record<string, unknown> };
expect(ipcCall.body.contactId).toBe("contact-1");
});
Expand Down
40 changes: 27 additions & 13 deletions gateway/src/db/assistant-db-proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
* gateway's own database.
*/

import { ipcCallAssistant } from "../ipc/assistant-client.js";
import {
IpcHandlerError,
ipcCallAssistant,
} from "../ipc/assistant-client.js";

type SqliteValue = string | number | null | Uint8Array;

Expand All @@ -28,11 +31,11 @@ async function dbProxy(
mode: "query" | "run" | "exec",
bind?: SqliteValue[],
): Promise<DbProxyResult> {
const result = await ipcCallAssistant("db_proxy", { sql, mode, bind });
if (result === undefined) {
throw new Error("db_proxy IPC call failed — assistant may not be ready");
}
return result as DbProxyResult;
return (await ipcCallAssistant("db_proxy", {
sql,
mode,
bind,
})) as DbProxyResult;
}

/**
Expand Down Expand Up @@ -107,17 +110,28 @@ export type AssistantDbTransactionResult =
* Use this when several writes must succeed or fail as a unit (e.g. invite
* redemption: contact-channel upsert + invite-use record).
*
* Error handling:
* - `requireChanges` violations return `{ ok: false, reason: "require_changes_failed", ... }`.
* - Handler-level failures (SQL constraint errors, malformed params) throw
* `IpcHandlerError` so the underlying SQL message is preserved.
* - Transport failures (socket missing, daemon unreachable, timeout) throw
* `IpcTransportError`. Use this to distinguish retryable vs.
* non-retryable failures.
*
* Read-modify-write across steps is not supported. Use SQL-level conditions
* (WHERE clauses, ON CONFLICT) plus `requireChanges` for stale-write detection.
*/
export async function assistantDbTransaction(
steps: AssistantDbTransactionStep[],
): Promise<AssistantDbTransactionResult> {
const result = await ipcCallAssistant("db_proxy_transaction", { steps });
if (result === undefined) {
throw new Error(
"db_proxy_transaction IPC call failed — assistant may not be ready",
);
}
return result as AssistantDbTransactionResult;
return (await ipcCallAssistant("db_proxy_transaction", {
steps,
})) as AssistantDbTransactionResult;
}

/**
* Re-export so callers in this module's domain (gateway DB write helpers)
* can identify SQL/handler failures from the assistant DB proxy without
* importing from the IPC client directly.
*/
export { IpcHandlerError };
29 changes: 29 additions & 0 deletions gateway/src/db/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,35 @@ export const contactChannels = sqliteTable(
],
);

export const ingressInvites = sqliteTable(
"ingress_invites",
{
id: text("id").primaryKey(),
sourceChannel: text("source_channel").notNull(),
inviteCodeHash: text("invite_code_hash").notNull(),
note: text("note"),
maxUses: integer("max_uses").notNull().default(1),
useCount: integer("use_count").notNull().default(0),
expiresAt: integer("expires_at").notNull(),
status: text("status").notNull().default("active"),
redeemedByExternalUserId: text("redeemed_by_external_user_id"),
redeemedByExternalChatId: text("redeemed_by_external_chat_id"),
redeemedAt: integer("redeemed_at"),
contactId: text("contact_id")
.notNull()
.references(() => contacts.id, { onDelete: "cascade" }),
createdAt: integer("created_at").notNull(),
updatedAt: integer("updated_at").notNull(),
},
(table) => [
index("idx_ingress_invites_code_lookup").on(
table.inviteCodeHash,
table.sourceChannel,
),
index("idx_ingress_invites_contact").on(table.contactId),
],
);

// ---------------------------------------------------------------------------
// Auto-approve thresholds
// ---------------------------------------------------------------------------
Expand Down
19 changes: 8 additions & 11 deletions gateway/src/handlers/handle-inbound.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,9 @@ export async function handleInbound(

// ── Contact channel interaction tracking (dual-write) ──
// Reads from the assistant DB (source of truth during migration),
// writes to both assistant DB and gateway DB. Uses ipcCallAssistant
// directly (resolves to undefined on failure, never throws) so
// socket errors cannot leak as unhandled rejections in tests.
// writes to both assistant DB and gateway DB. Fire-and-forget so
// IPC failures here cannot leak as unhandled rejections.
if (!response.denied) {
// Fire-and-forget: detach from current async context so pending
// IPC socket operations cannot leak into test runners.
void touchContactChannelStats(event, response.duplicate).catch(
() => {},
);
Expand Down Expand Up @@ -211,8 +208,8 @@ interface DbProxyResult {
* Look up the contact channel in the assistant DB and dual-write
* interaction stats to both the assistant and gateway databases.
*
* Uses ipcCallAssistant directly (resolves to undefined on failure)
* so socket errors cannot surface as unhandled rejections.
* Caller wraps in `.catch(() => {})` so IPC failures cannot surface as
* unhandled rejections.
*/
async function touchContactChannelStats(
event: GatewayInboundEvent,
Expand All @@ -234,17 +231,17 @@ async function touchContactChannelStats(
sql: "SELECT id FROM contact_channels WHERE type = ? AND external_user_id = ? LIMIT 1",
mode: "query",
bind: [event.sourceChannel, canonicalActorId],
})) as DbProxyResult | undefined;
})) as DbProxyResult;

if (!result?.rows?.length) {
if (!result.rows?.length) {
result = (await ipcCallAssistant("db_proxy", {
sql: "SELECT id FROM contact_channels WHERE type = ? AND external_chat_id = ? LIMIT 1",
mode: "query",
bind: [event.sourceChannel, event.message.conversationExternalId],
})) as DbProxyResult | undefined;
})) as DbProxyResult;
}

if (!result?.rows?.length) return;
if (!result.rows?.length) return;

const channelId = result.rows[0].id as string;
const now = Date.now();
Expand Down
59 changes: 44 additions & 15 deletions gateway/src/http/routes/contact-prompt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,10 @@ export async function handleContactPromptSubmit(req: Request): Promise<Response>
{ channelType, address: normalizedAddress, contactId, existingContactId: existingChannel[0].contactId },
"contact-prompt-submit: channel already assigned to another contact",
);
await ipcCallAssistant("resolve_contact_prompt", {
body: { requestId, error: "Channel already assigned to another contact" },
});
await notifyDaemonResolveError(
requestId,
"Channel already assigned to another contact",
);
return Response.json(
{ accepted: false, error: "Channel already assigned to another contact" },
{ status: 409 },
Expand Down Expand Up @@ -223,9 +224,10 @@ export async function handleContactPromptSubmit(req: Request): Promise<Response>
}

// Notify daemon of failure so the CLI doesn't hang.
await ipcCallAssistant("resolve_contact_prompt", {
body: { requestId, error: "Failed to create contact channel" },
});
await notifyDaemonResolveError(
requestId,
"Failed to create contact channel",
);
return Response.json(
{ accepted: false, error: "Failed to create contact channel" },
{ status: 500 },
Expand All @@ -239,22 +241,49 @@ export async function handleContactPromptSubmit(req: Request): Promise<Response>
}
} catch (err) {
log.error({ err, requestId }, "contact-prompt-submit: DB error");
await ipcCallAssistant("resolve_contact_prompt", {
body: { requestId, error: "Database error" },
});
await notifyDaemonResolveError(requestId, "Database error");
return Response.json({ accepted: false, error: "Database error" }, { status: 500 });
}

// Notify daemon to unblock the waiting contacts/prompt IPC call.
const ipcResult = await ipcCallAssistant("resolve_contact_prompt", {
body: { requestId, contactId, channelId, channelType, address: normalizedAddress },
});
if (!ipcResult || (ipcResult as { resolved?: boolean }).resolved === false) {
try {
const ipcResult = await ipcCallAssistant("resolve_contact_prompt", {
body: { requestId, contactId, channelId, channelType, address: normalizedAddress },
});
if ((ipcResult as { resolved?: boolean }).resolved === false) {
log.warn(
{ requestId, contactId },
"contact-prompt-submit: resolve_contact_prompt IPC did not find a pending prompt — CLI may time out",
);
}
} catch (err) {
log.warn(
{ requestId, contactId },
"contact-prompt-submit: resolve_contact_prompt IPC did not find a pending prompt — CLI may time out",
{ err, requestId, contactId },
"contact-prompt-submit: resolve_contact_prompt IPC failed — CLI may time out",
);
}

return Response.json({ accepted: true });
}

/**
* Best-effort notification to the daemon that a pending contact prompt has
* resolved with an error. Failures here must not block the HTTP response —
* the caller has already decided the request failed; we just want to wake
* the CLI up.
*/
async function notifyDaemonResolveError(
requestId: string,
error: string,
): Promise<void> {
try {
await ipcCallAssistant("resolve_contact_prompt", {
body: { requestId, error },
});
} catch (err) {
log.warn(
{ err, requestId },
"contact-prompt-submit: resolve_contact_prompt error notification failed",
);
}
}
2 changes: 1 addition & 1 deletion gateway/src/http/routes/contacts-control-plane-proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ export function createContactsControlPlaneProxyHandler(config: GatewayConfig) {
getGatewayDb().delete(contacts).where(eq(contacts.id, contactId)).run();
void ipcCallAssistant("emit_event", {
body: { kind: "contacts_changed" },
} as unknown as Record<string, unknown>);
} as unknown as Record<string, unknown>).catch(() => {});
log.info({ contactId }, "delete_contact: deleted");
return new Response(null, { status: 204 });
},
Expand Down
Loading
Loading