diff --git a/assistant/src/cli/commands/db/__tests__/repair.test.ts b/assistant/src/cli/commands/db/__tests__/repair.test.ts index 88103de903d..e54cdd19d36 100644 --- a/assistant/src/cli/commands/db/__tests__/repair.test.ts +++ b/assistant/src/cli/commands/db/__tests__/repair.test.ts @@ -19,6 +19,7 @@ import { mkdtempSync, openSync, rmSync, + writeFileSync, writeSync, } from "node:fs"; import { tmpdir } from "node:os"; @@ -169,21 +170,24 @@ describe("assistant db repair — healthy DB", () => { expect(stdout).toContain("integrity-check"); expect(stdout).toContain("ok"); expect(stdout).toContain("no corruption detected"); - expect(stdout).toMatch(/Done\. 1 step ran: 1 ok, 0 failed/); + expect(stdout).toContain("conversation-backfill"); + expect(stdout).toMatch(/Done\. 2 steps ran: 2 ok, 0 failed/); }); - test("--json emits a structured report with the step result", async () => { + test("--json emits a structured report with all step results", async () => { seedHealthyDb(); const { stdout, exitCode } = await runRepair(["--json", "repair"]); expect(exitCode).toBe(0); const parsed = JSON.parse(stdout); expect(parsed.dbPath).toBe(dbPath); - expect(parsed.steps).toHaveLength(1); + expect(parsed.steps).toHaveLength(2); expect(parsed.steps[0].name).toBe("integrity-check"); expect(parsed.steps[0].result.status).toBe("ok"); expect(parsed.steps[0].result.data.errorCount).toBe(0); expect(typeof parsed.steps[0].result.durationMs).toBe("number"); - expect(parsed.okCount).toBe(1); + expect(parsed.steps[1].name).toBe("conversation-backfill"); + expect(parsed.steps[1].result.status).toBe("ok"); + expect(parsed.okCount).toBe(2); expect(parsed.errorCount).toBe(0); }); }); @@ -203,14 +207,19 @@ describe("assistant db repair — corrupt DB", () => { /(integrity violation|database is too corrupt|database disk image is malformed)/, ); expect(stdout).not.toContain("this is a bug"); - expect(stdout).toMatch(/Done\. 1 step ran: 0 ok, 1 failed/); + // Backfill still attempts to run after integrity check fails (continue + // on non-halting error). On the rollback-journal-mode corrupt seed + // backfill itself runs against an empty conversations dir and reports + // "nothing to backfill", so the summary is `1 ok, 1 failed`. + expect(stdout).toMatch(/Done\. 2 steps ran: 1 ok, 1 failed/); }); - test("--json carries the full error list", async () => { + test("--json carries the full error list from integrity-check", async () => { seedCorruptDb(); const { stdout, exitCode } = await runRepair(["--json", "repair"]); expect(exitCode).toBe(1); const parsed = JSON.parse(stdout); + expect(parsed.steps[0].name).toBe("integrity-check"); expect(parsed.steps[0].result.status).toBe("error"); expect(parsed.steps[0].result.data).toBeDefined(); expect(Array.isArray(parsed.steps[0].result.data.errors)).toBe(true); @@ -353,3 +362,179 @@ describe("repair step runner", () => { expect(report.steps[0].result.durationMs).toBeGreaterThanOrEqual(0); }); }); + +// --------------------------------------------------------------------------- +// Integrity step — open failures +// --------------------------------------------------------------------------- + +describe("integrity-check step — open failures", () => { + test("file-is-a-directory surfaces as a structured error", async () => { + // Make the db path itself a directory, so SQLite can't open it as a + // file. The constructor throws; without our explicit catch the runner + // would mark this as "this is a bug". + rmSync(dbPath, { force: true }); + mkdirSync(dbPath, { recursive: true }); + + const { stdout, exitCode } = await runRepair(["--json", "repair"]); + expect(exitCode).toBe(1); + const parsed = JSON.parse(stdout); + expect(parsed.steps[0].name).toBe("integrity-check"); + expect(parsed.steps[0].result.status).toBe("error"); + expect(parsed.steps[0].result.data.openFailed).toBe(true); + expect(parsed.steps[0].result.summary).toContain("could not open"); + // Crucially: not flagged as a bug. + const allDetail = JSON.stringify(parsed); + expect(allDetail).not.toContain("this is a bug"); + }); +}); + +// --------------------------------------------------------------------------- +// Conversation-backfill step +// --------------------------------------------------------------------------- + +/** + * Seed a `/conversations//` directory pair on disk for the + * backfill step to discover. Returns the conversation id used. + */ +function seedDiskConversation( + opts: { + id?: string; + title?: string; + messages?: Array<{ role: string; content: string; ts?: string }>; + } = {}, +): string { + const id = opts.id ?? "conv-disk-1"; + const dir = join(workspaceDir, "conversations", id); + mkdirSync(dir, { recursive: true }); + writeFileSync( + join(dir, "meta.json"), + JSON.stringify({ + id, + title: opts.title ?? "Backfilled", + createdAt: "2025-01-01T00:00:00.000Z", + updatedAt: "2025-01-01T00:01:00.000Z", + }), + ); + if (opts.messages) { + const lines = opts.messages + .map((m) => + JSON.stringify({ role: m.role, content: m.content, ts: m.ts }), + ) + .join("\n"); + writeFileSync(join(dir, "messages.jsonl"), lines + "\n"); + } + return id; +} + +/** Apply the schema to the test DB so backfill has tables to insert into. */ +async function initSchema(): Promise { + // The repair step opens its own bun:sqlite handle but expects the schema + // to already exist (production-wise, the daemon creates it). Touching the + // global init triggers schema creation against the env-isolated path. + const { initializeDb } = await import("../../../../memory/db-init.js"); + initializeDb(); + // Close the singleton so backfill can open its own handle without + // collision. WAL allows concurrent handles but cleaner ownership avoids + // test cross-talk through the in-process cache. + const { getDb, getSqliteFrom } = + await import("../../../../memory/db-connection.js"); + const { clearStoredDb } = await import("../../../../memory/db-singleton.js"); + try { + getSqliteFrom(getDb()).close(); + } catch { + /* already closed */ + } + clearStoredDb(); +} + +describe("assistant db repair — conversation-backfill step", () => { + test("backfills a disk-only conversation into SQLite", async () => { + await initSchema(); + seedDiskConversation({ + id: "conv-recover-1", + title: "Recover me", + messages: [ + { role: "user", content: "hi", ts: "2025-01-01T00:00:30.000Z" }, + { role: "assistant", content: "hello", ts: "2025-01-01T00:00:31.000Z" }, + ], + }); + + const { stdout, exitCode } = await runRepair(["--json", "repair"]); + expect(exitCode).toBe(0); + const parsed = JSON.parse(stdout); + const backfill = parsed.steps[1]; + expect(backfill.name).toBe("conversation-backfill"); + expect(backfill.result.status).toBe("ok"); + expect(backfill.result.data.recovered).toBe(1); + expect(backfill.result.data.errors).toBe(0); + expect(backfill.result.data.skipped).toBe(0); + + // And confirm it actually landed in SQLite. + const verify = new Database(dbPath, { readonly: true }); + try { + const row = verify + .query< + { id: string; title: string }, + [] + >("SELECT id, title FROM conversations WHERE id = 'conv-recover-1'") + .get(); + expect(row?.id).toBe("conv-recover-1"); + expect(row?.title).toBe("Recover me"); + const msgCount = verify + .query< + { n: number }, + [] + >("SELECT COUNT(*) AS n FROM messages WHERE conversation_id = 'conv-recover-1'") + .get(); + expect(msgCount?.n).toBe(2); + } finally { + verify.close(); + } + }); + + test("skips conversations already present (idempotent)", async () => { + await initSchema(); + seedDiskConversation({ id: "conv-idempotent-1" }); + + // First run: recover. + let { stdout } = await runRepair(["--json", "repair"]); + let parsed = JSON.parse(stdout); + expect(parsed.steps[1].result.data.recovered).toBe(1); + + // Second run: should skip. + ({ stdout } = await runRepair(["--json", "repair"])); + parsed = JSON.parse(stdout); + expect(parsed.steps[1].result.data.recovered).toBe(0); + expect(parsed.steps[1].result.data.skipped).toBe(1); + expect(parsed.steps[1].result.status).toBe("ok"); + }); + + test("reports nothing-to-backfill on an empty conversations dir", async () => { + await initSchema(); + // No seedDiskConversation call. + const { stdout, exitCode } = await runRepair(["--json", "repair"]); + expect(exitCode).toBe(0); + const parsed = JSON.parse(stdout); + expect(parsed.steps[1].result.status).toBe("ok"); + expect(parsed.steps[1].result.summary).toContain("nothing to backfill"); + expect(parsed.steps[1].result.data.recovered).toBe(0); + }); + + test("surfaces warnings for malformed meta.json without erroring the step", async () => { + await initSchema(); + const dir = join(workspaceDir, "conversations", "broken-1"); + mkdirSync(dir, { recursive: true }); + writeFileSync(join(dir, "meta.json"), "{ not valid json"); + + const { stdout, exitCode } = await runRepair(["--json", "repair"]); + expect(exitCode).toBe(0); + const parsed = JSON.parse(stdout); + expect(parsed.steps[1].result.status).toBe("ok"); + expect(parsed.steps[1].result.data.skipped).toBe(1); + expect( + parsed.steps[1].result.data.warnings.some((w: string) => + w.includes("malformed meta.json"), + ), + ).toBe(true); + }); +}); diff --git a/assistant/src/cli/commands/db/repair-step-conversation-backfill.ts b/assistant/src/cli/commands/db/repair-step-conversation-backfill.ts new file mode 100644 index 00000000000..f2778ec1d53 --- /dev/null +++ b/assistant/src/cli/commands/db/repair-step-conversation-backfill.ts @@ -0,0 +1,345 @@ +/** + * Repair step: conversation backfill from the on-disk view. + * + * Each conversation directory under `/conversations//` holds + * a `meta.json` and `messages.jsonl` written by the runtime as the source + * of truth for the disk view. If the SQLite database was wiped, restored + * from an old backup, or otherwise lost the `conversations`/`messages` + * rows, this step replays the on-disk files to reconstruct them. + * + * Workspace migration 028 performs the same kind of recovery at startup, + * but its body is a frozen snapshot that runs against historical + * workspaces. This step owns its own copy so the live `db repair` surface + * can evolve independently — bug fixes, new edge cases, or schema changes + * don't risk altering migration 028's behavior on workspaces that have + * already run it. + * + * This step opens its own read-write bun:sqlite handle so the command + * works when the daemon is down — the whole point of the local transport. + * + * Idempotent: existing conversation rows are skipped without modification. + */ + +import { randomUUID } from "node:crypto"; +import { existsSync, readdirSync, readFileSync, statSync } from "node:fs"; +import { join } from "node:path"; +import { Database } from "bun:sqlite"; + +import { eq } from "drizzle-orm"; +import { drizzle } from "drizzle-orm/bun-sqlite"; + +import * as schema from "../../../memory/schema.js"; +import { + conversations, + messages, +} from "../../../memory/schema/conversations.js"; +import { getWorkspaceDir } from "../../../util/platform.js"; +import type { RepairContext, RepairStep, StepResult } from "./repair-steps.js"; + +/** + * Cap on warning lines surfaced in the human-mode output. The JSON payload + * carries the full list (subject to `WARNING_CAP_TOTAL`) so scripted callers + * never lose detail. + */ +const MAX_REPORTED_WARNING_LINES = 20; + +/** + * Hard cap on warnings retained in memory, even for the JSON payload. + * Prevents a workspace with thousands of malformed entries from blowing + * memory on the report object. + */ +const WARNING_CAP_TOTAL = 500; + +// --------------------------------------------------------------------------- +// On-disk record shapes +// --------------------------------------------------------------------------- + +interface DiskMeta { + id: string; + title?: string; + type?: string; + channel?: string; + createdAt?: string; + updatedAt?: string; +} + +interface DiskToolCall { + name?: string; + input?: unknown; +} + +interface DiskToolResult { + content?: unknown; +} + +interface DiskMessageRecord { + role: string; + ts?: string; + content?: string; + toolCalls?: DiskToolCall[]; + toolResults?: DiskToolResult[]; + attachments?: unknown[]; +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function parseEpochMs(isoString: string | undefined): number | null { + if (!isoString) return null; + const ms = new Date(isoString).getTime(); + return Number.isNaN(ms) ? null : ms; +} + +function buildContentBlocks(record: DiskMessageRecord): unknown[] { + const blocks: unknown[] = []; + + if (record.content) { + blocks.push({ type: "text", text: record.content }); + } + + if (Array.isArray(record.toolCalls)) { + for (const tc of record.toolCalls) { + blocks.push({ + type: "tool_use", + id: randomUUID(), + name: tc.name ?? "unknown", + input: tc.input ?? {}, + }); + } + } + + if (Array.isArray(record.toolResults)) { + for (const tr of record.toolResults) { + blocks.push({ + type: "tool_result", + tool_use_id: "", + content: + typeof tr.content === "string" + ? tr.content + : JSON.stringify(tr.content), + }); + } + } + + // content column is NOT NULL — ensure at least one block + if (blocks.length === 0) { + blocks.push({ type: "text", text: "" }); + } + + return blocks; +} + +// --------------------------------------------------------------------------- +// Step body +// --------------------------------------------------------------------------- + +async function runConversationBackfill( + ctx: RepairContext, +): Promise { + // Open RW so we can insert recovered rows. Mirror the daemon's pragmas + // for consistent journal/FK behavior. + let sqlite: Database; + try { + sqlite = new Database(ctx.dbPath); + sqlite.exec("PRAGMA journal_mode=WAL"); + sqlite.exec("PRAGMA synchronous=FULL"); + sqlite.exec("PRAGMA busy_timeout=5000"); + sqlite.exec("PRAGMA foreign_keys = ON"); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + return { + status: "error", + summary: "could not open database for conversation backfill", + detailLines: [msg], + data: { + recovered: 0, + skipped: 0, + errors: 1, + warnings: [msg], + openFailed: true, + }, + }; + } + + try { + const db = drizzle(sqlite, { schema }); + const workspaceDir = getWorkspaceDir(); + + let recovered = 0; + let skipped = 0; + let errors = 0; + const warnings: string[] = []; + const pushWarning = (line: string): void => { + if (warnings.length < WARNING_CAP_TOTAL) warnings.push(line); + }; + + const conversationsDir = join(workspaceDir, "conversations"); + if (existsSync(conversationsDir)) { + let entries: string[]; + try { + entries = readdirSync(conversationsDir); + } catch (err) { + pushWarning(`failed to read conversations directory: ${String(err)}`); + entries = []; + } + + for (const entry of entries) { + const dirPath = join(conversationsDir, entry); + + try { + if (!statSync(dirPath).isDirectory()) continue; + } catch { + continue; + } + + const metaPath = join(dirPath, "meta.json"); + if (!existsSync(metaPath)) { + pushWarning(`${entry}: missing meta.json`); + skipped++; + continue; + } + + let meta: DiskMeta; + try { + meta = JSON.parse(readFileSync(metaPath, "utf-8")) as DiskMeta; + } catch (err) { + pushWarning(`${entry}: malformed meta.json: ${String(err)}`); + skipped++; + continue; + } + + if (!meta.id) { + pushWarning(`${entry}: meta.json missing id`); + skipped++; + continue; + } + + const existing = db + .select() + .from(conversations) + .where(eq(conversations.id, meta.id)) + .get(); + + if (existing) { + skipped++; + continue; + } + + const messageRecords: DiskMessageRecord[] = []; + const messagesPath = join(dirPath, "messages.jsonl"); + if (existsSync(messagesPath)) { + try { + const raw = readFileSync(messagesPath, "utf-8"); + for (const line of raw.split("\n")) { + const trimmed = line.trim(); + if (!trimmed) continue; + try { + messageRecords.push(JSON.parse(trimmed) as DiskMessageRecord); + } catch { + pushWarning(`${entry}: malformed JSONL line in messages.jsonl`); + } + } + } catch (err) { + pushWarning( + `${entry}: failed to read messages.jsonl: ${String(err)}`, + ); + } + } + + const createdAt = parseEpochMs(meta.createdAt) ?? Date.now(); + const updatedAt = parseEpochMs(meta.updatedAt) ?? createdAt; + + try { + db.transaction((tx) => { + tx.insert(conversations) + .values({ + id: meta.id, + title: meta.title ?? null, + createdAt, + updatedAt, + conversationType: meta.type ?? "standard", + originChannel: meta.channel ?? null, + source: "user", + memoryScopeId: "default", + isAutoTitle: 1, + totalInputTokens: 0, + totalOutputTokens: 0, + totalEstimatedCost: 0, + contextSummary: null, + contextCompactedMessageCount: 0, + contextCompactedAt: null, + originInterface: null, + forkParentConversationId: null, + forkParentMessageId: null, + scheduleJobId: null, + }) + .run(); + + for (const record of messageRecords) { + const contentBlocks = buildContentBlocks(record); + const msgCreatedAt = parseEpochMs(record.ts) ?? createdAt; + + tx.insert(messages) + .values({ + id: randomUUID(), + conversationId: meta.id, + role: record.role, + content: JSON.stringify(contentBlocks), + createdAt: msgCreatedAt, + metadata: null, + }) + .run(); + } + }); + recovered++; + } catch (err) { + pushWarning(`${meta.id} (${entry}): insert failed: ${String(err)}`); + errors++; + } + } + } + + const summary = + recovered === 0 && errors === 0 + ? `nothing to backfill (${skipped} on-disk conversation${skipped === 1 ? "" : "s"} already present)` + : `recovered ${recovered}, skipped ${skipped}, ${errors} error${errors === 1 ? "" : "s"}`; + + const truncatedWarnings = warnings.slice(0, MAX_REPORTED_WARNING_LINES); + const detailLines = + warnings.length > MAX_REPORTED_WARNING_LINES + ? [ + ...truncatedWarnings, + `+ ${warnings.length - MAX_REPORTED_WARNING_LINES} more (use --json for full list)`, + ] + : truncatedWarnings; + + // Errors during insert are surfaced as a non-halting failure so later + // steps still run. Warnings without errors (malformed JSONL lines, + // missing meta.json) are not themselves a failure — they're skips. + if (errors > 0) { + return { + status: "error", + summary, + detailLines, + data: { recovered, skipped, errors, warnings }, + }; + } + + return { + status: "ok", + summary, + detailLines, + data: { recovered, skipped, errors, warnings }, + }; + } finally { + sqlite.close(); + } +} + +export const conversationBackfillStep: RepairStep = { + name: "conversation-backfill", + description: + "Replay workspace/conversations//{meta.json,messages.jsonl} into SQLite", + run: runConversationBackfill, +}; diff --git a/assistant/src/cli/commands/db/repair-step-integrity.ts b/assistant/src/cli/commands/db/repair-step-integrity.ts index 5af127b2774..30e8a16d5e2 100644 --- a/assistant/src/cli/commands/db/repair-step-integrity.ts +++ b/assistant/src/cli/commands/db/repair-step-integrity.ts @@ -15,14 +15,13 @@ * please tell me everything that's wrong". * * The step never mutates the database. If corruption is found the step - * returns a non-halting error — subsequent steps (conversation backfill, - * etc.) may still produce useful work even on a partially-corrupt DB. + * returns a non-halting error — subsequent steps may still produce useful + * work even on a partially-corrupt DB. */ import { Database } from "bun:sqlite"; import type { RepairContext, RepairStep, StepResult } from "./repair-steps.js"; -import { withDb } from "./repair-steps.js"; /** * Maximum number of corruption error lines to surface in the human output @@ -33,76 +32,95 @@ import { withDb } from "./repair-steps.js"; const MAX_REPORTED_ERROR_LINES = 20; async function runIntegrityCheck(ctx: RepairContext): Promise { - return withDb( - () => new Database(ctx.dbPath, { readonly: true }), - async (db) => { - // `PRAGMA integrity_check` returns rows of a single TEXT column also - // named `integrity_check`. When the DB is healthy this is exactly one - // row whose value is the literal "ok"; any other shape means errors. - // - // Severely corrupted DBs (header damaged, b-tree root unreadable) - // can cause the pragma itself to throw "database disk image is - // malformed" before yielding any rows. We catch that and surface it - // as the corruption signal it actually is — it's not a bug in the - // step, it's the DB telling us it's structurally invalid. - let messages: string[]; - try { - const rows = db - .query<{ integrity_check: string }, []>("PRAGMA integrity_check") - .all(); - messages = rows.map((r) => r.integrity_check); - } catch (err) { - const msg = err instanceof Error ? err.message : String(err); - const pageCount = safePageCount(db); - return { - status: "error", - summary: "database is too corrupt to complete integrity check", - detailLines: [ - msg, - `page count: ${pageCount.toLocaleString("en-US")}`, - ], - data: { - pageCount, - errorCount: 1, - errors: [msg], - checkFailed: true, - }, - }; - } + // Open-failure (file unreadable, file is a directory, header so corrupt + // SQLite refuses to attach) needs to surface as a normal step error, not + // get caught by the runner's generic "step threw — this is a bug" + // fallback. Catch the constructor failure and convert it into a + // structured diagnostic. + let db: Database; + try { + db = new Database(ctx.dbPath, { readonly: true }); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + return { + status: "error", + summary: "could not open database for integrity check", + detailLines: [msg], + data: { + pageCount: 0, + errorCount: 1, + errors: [msg], + openFailed: true, + }, + }; + } - const healthy = messages.length === 1 && messages[0] === "ok"; + try { + // `PRAGMA integrity_check` returns rows of a single TEXT column also + // named `integrity_check`. When the DB is healthy this is exactly one + // row whose value is the literal "ok"; any other shape means errors. + // + // Severely corrupted DBs (header damaged, b-tree root unreadable) can + // cause the pragma itself to throw "database disk image is malformed" + // before yielding any rows. Catch that and surface it as the + // corruption signal it actually is — not a step bug, the DB telling + // us it's structurally invalid. + let messages: string[]; + try { + const rows = db + .query<{ integrity_check: string }, []>("PRAGMA integrity_check") + .all(); + messages = rows.map((r) => r.integrity_check); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); const pageCount = safePageCount(db); - - if (healthy) { - return { - status: "ok", - summary: "no corruption detected", - detailLines: [`scanned ${pageCount.toLocaleString("en-US")} pages`], - data: { pageCount, errorCount: 0 }, - }; - } - - const truncated = messages.slice(0, MAX_REPORTED_ERROR_LINES); - const detailLines = - messages.length > MAX_REPORTED_ERROR_LINES - ? [ - ...truncated, - `+ ${messages.length - MAX_REPORTED_ERROR_LINES} more (use --json for full list)`, - ] - : truncated; - return { status: "error", - summary: `${messages.length} integrity violation${messages.length === 1 ? "" : "s"} reported`, - detailLines, + summary: "database is too corrupt to complete integrity check", + detailLines: [msg, `page count: ${pageCount.toLocaleString("en-US")}`], data: { pageCount, - errorCount: messages.length, - errors: messages, + errorCount: 1, + errors: [msg], + checkFailed: true, }, }; - }, - ); + } + + const healthy = messages.length === 1 && messages[0] === "ok"; + const pageCount = safePageCount(db); + + if (healthy) { + return { + status: "ok", + summary: "no corruption detected", + detailLines: [`scanned ${pageCount.toLocaleString("en-US")} pages`], + data: { pageCount, errorCount: 0 }, + }; + } + + const truncated = messages.slice(0, MAX_REPORTED_ERROR_LINES); + const detailLines = + messages.length > MAX_REPORTED_ERROR_LINES + ? [ + ...truncated, + `+ ${messages.length - MAX_REPORTED_ERROR_LINES} more (use --json for full list)`, + ] + : truncated; + + return { + status: "error", + summary: `${messages.length} integrity violation${messages.length === 1 ? "" : "s"} reported`, + detailLines, + data: { + pageCount, + errorCount: messages.length, + errors: messages, + }, + }; + } finally { + db.close(); + } } /** @@ -110,7 +128,7 @@ async function runIntegrityCheck(ctx: RepairContext): Promise { * from the header), but on truly malformed files it can throw too. Wrap * it so the integrity step always has a number to report. */ -function safePageCount(db: import("bun:sqlite").Database): number { +function safePageCount(db: Database): number { try { return ( db.query<{ page_count: number }, []>("PRAGMA page_count").get() diff --git a/assistant/src/cli/commands/db/repair-steps.ts b/assistant/src/cli/commands/db/repair-steps.ts index 03f810fe4b3..50e63954714 100644 --- a/assistant/src/cli/commands/db/repair-steps.ts +++ b/assistant/src/cli/commands/db/repair-steps.ts @@ -1,34 +1,21 @@ /** * Step framework for `assistant db repair`. * - * `repair` is conceptually a sequence of discrete remediation passes: + * `repair` runs a sequence of discrete remediation passes. Each step is a + * `RepairStep` with a `name`, a one-line `description`, and a `run(ctx)` + * that returns a `StepResult`. * - * 1. integrity check (this PR) - * 2. conversation backfill (next PR — replay /workspace/conversations - * into SQLite) - * 3. … more to come (memory consolidation, lost-and-found triage, etc.) + * The runner: + * - executes steps sequentially in the order supplied + * - continues past non-halting failures (a corrupt DB doesn't preclude + * re-deriving conversations from disk) + * - stops only when a step returns `halt: true` + * - never throws — uncaught errors from a step body are captured as a + * synthetic `error` result so a bug in one step can't crash the run * - * Each step is a small unit that: - * - logs a "starting" line when it begins - * - produces a `StepResult` describing what happened - * - logs a single "success" or "error" summary line with details - * - * The runner is intentionally not clever: - * - steps run sequentially (later steps may depend on earlier ones; in - * particular, a step that mutates the DB needs preceding integrity-check - * results to be visible) - * - a failed step does NOT halt the sequence by default. Repair is a - * best-effort surface — a corrupted DB doesn't mean we should skip - * re-deriving conversations from disk. Steps that genuinely cannot - * continue on failure mark themselves `halt: true`. - * - the runner never throws; every error is captured into a `StepResult` - * so callers can render a coherent summary - * - * `RepairContext` holds the per-run state every step shares — the DB path - * and any opened handles. Steps may open their own bun:sqlite connections - * (e.g. integrity check opens read-only) rather than holding one open at - * the context level; future write-side steps will need to open RW - * themselves anyway. + * `RepairContext` holds the per-run state every step shares — currently + * just the DB path. Steps open their own bun:sqlite connections (read-only + * or read-write as needed) rather than sharing a handle through the context. */ import type { Database } from "bun:sqlite"; diff --git a/assistant/src/cli/commands/db/repair.ts b/assistant/src/cli/commands/db/repair.ts index f08127162e3..76a4da27a8e 100644 --- a/assistant/src/cli/commands/db/repair.ts +++ b/assistant/src/cli/commands/db/repair.ts @@ -4,12 +4,10 @@ * Composes the step framework in `repair-steps.ts` with the concrete steps * imported from `repair-step-*.ts` files. Each step logs its own * starting/success/error lines; the runner aggregates results into a - * `RepairReport` that we render either as plain text or as a single JSON + * `RepairReport` that renders either as plain text or as a single JSON * payload (`--json`). * - * This PR ships exactly one step (integrity-check). Subsequent PRs append - * more steps to the `STEPS` array — that's the only edit they need to make - * here. + * Adding a new step is a one-line edit to the `STEPS` array. * * Transport: `local`. The whole point of this surface is that it works when * the daemon is down, so it opens the DB file directly and never goes @@ -23,6 +21,7 @@ import type { Command } from "commander"; import { getDbPath } from "../../../util/platform.js"; import { dim, green, red } from "../../lib/cli-colors.js"; import { shouldOutputJson, writeOutput } from "../../output.js"; +import { conversationBackfillStep } from "./repair-step-conversation-backfill.js"; import { integrityCheckStep } from "./repair-step-integrity.js"; import type { RepairReport, RepairStep, StepResult } from "./repair-steps.js"; import { formatDurationMs, runRepairSteps } from "./repair-steps.js"; @@ -32,13 +31,10 @@ import { formatDurationMs, runRepairSteps } from "./repair-steps.js"; // --------------------------------------------------------------------------- /** - * Repair steps run in the order listed here. Order matters: - * 1. integrity-check FIRST — surfaces structural damage before we attempt - * anything that touches the same pages - * 2. (future) conversation backfill from /workspace/conversations - * 3. (future) memory consolidation, lost-and-found triage, etc. + * Repair steps run in the order listed here. Integrity check runs first so + * structural damage surfaces before subsequent steps touch the same pages. */ -const STEPS: RepairStep[] = [integrityCheckStep]; +const STEPS: RepairStep[] = [integrityCheckStep, conversationBackfillStep]; // --------------------------------------------------------------------------- // Rendering diff --git a/assistant/src/workspace/migrations/028-recover-conversations-from-disk-view.ts b/assistant/src/workspace/migrations/028-recover-conversations-from-disk-view.ts index 6dbf2d18d39..257c906bdf5 100644 --- a/assistant/src/workspace/migrations/028-recover-conversations-from-disk-view.ts +++ b/assistant/src/workspace/migrations/028-recover-conversations-from-disk-view.ts @@ -133,9 +133,7 @@ export const recoverConversationsFromDiskViewMigration: WorkspaceMigration = { // Read and parse meta.json const metaPath = join(dirPath, "meta.json"); if (!existsSync(metaPath)) { - log.warn( - `Skipping ${entry}: missing meta.json`, - ); + log.warn(`Skipping ${entry}: missing meta.json`); skipped++; continue; } @@ -144,17 +142,13 @@ export const recoverConversationsFromDiskViewMigration: WorkspaceMigration = { try { meta = JSON.parse(readFileSync(metaPath, "utf-8")) as DiskMeta; } catch (err) { - log.warn( - `Skipping ${entry}: malformed meta.json: ${err}`, - ); + log.warn(`Skipping ${entry}: malformed meta.json: ${err}`); skipped++; continue; } if (!meta.id) { - log.warn( - `Skipping ${entry}: meta.json missing id`, - ); + log.warn(`Skipping ${entry}: meta.json missing id`); skipped++; continue; } @@ -182,9 +176,7 @@ export const recoverConversationsFromDiskViewMigration: WorkspaceMigration = { const trimmed = line.trim(); if (!trimmed) continue; try { - messageRecords.push( - JSON.parse(trimmed) as DiskMessageRecord, - ); + messageRecords.push(JSON.parse(trimmed) as DiskMessageRecord); } catch { log.warn( `Skipping malformed JSONL line in ${entry}/messages.jsonl`, @@ -192,9 +184,7 @@ export const recoverConversationsFromDiskViewMigration: WorkspaceMigration = { } } } catch (err) { - log.warn( - `Failed to read messages.jsonl for ${entry}: ${err}`, - ); + log.warn(`Failed to read messages.jsonl for ${entry}: ${err}`); } } @@ -231,8 +221,7 @@ export const recoverConversationsFromDiskViewMigration: WorkspaceMigration = { for (const record of messageRecords) { const contentBlocks = buildContentBlocks(record); - const msgCreatedAt = - parseEpochMs(record.ts) ?? createdAt; + const msgCreatedAt = parseEpochMs(record.ts) ?? createdAt; tx.insert(messages) .values({ @@ -249,9 +238,7 @@ export const recoverConversationsFromDiskViewMigration: WorkspaceMigration = { recovered++; } catch (err) { - log.warn( - `Failed to insert conversation ${meta.id} (${entry}): ${err}`, - ); + log.warn(`Failed to insert conversation ${meta.id} (${entry}): ${err}`); errors++; } }