From 87c1265e12d1f044784b726e2f9d183e3176ec53 Mon Sep 17 00:00:00 2001 From: "vellum-apollo-bot[bot]" <242025090+vellum-apollo-bot[bot]@users.noreply.github.com> Date: Fri, 29 May 2026 22:32:57 +0000 Subject: [PATCH 1/2] =?UTF-8?q?feat(cli):=20assistant=20db=20repair=20?= =?UTF-8?q?=E2=80=94=20conversation-backfill=20step=20+=20integrity-step?= =?UTF-8?q?=20open-failure=20catch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the second step to the `assistant db repair` sequence — replays `/conversations//{meta.json,messages.jsonl}` into the SQLite conversations/messages tables so a wiped or restored-from-old- backup database can be rebuilt from the on-disk view. Architecture: the recovery body lives in a new shared module `workspace/recovery/conversations-from-disk.ts` that takes a drizzle handle + workspace dir and returns `{ recovered, skipped, errors, warnings }`. Two callers consume it: 1. workspace migration 028 — runs once at startup against the daemon's global `getDb()` (rewritten from 271 → 46 lines, delegates to the shared function) 2. `db repair` conversation-backfill step — opens its own RW bun:sqlite handle with the same pragmas as the daemon, wraps it in drizzle, calls the shared function Idempotent: the per-conversation existence check guards both call sites. Malformed `meta.json` / `messages.jsonl` lines are skipped with warnings (capped at 20 in human output, full list in --json up to a 500-entry memory cap). Two follow-ups from PR #32632 review folded in: - Vargas: dropped `(this PR)` / `(next PR)` / `(future)` PR- chronology callouts from `repair-steps.ts` and `repair.ts` module docs and from the `STEPS` comment. Rewritten to describe the abstraction (sequence of steps, append by extending the array) rather than the timeline. Codified in the software-engineering skill's `comments.md` as a lesson entry. - Codex P2: `integrity-check` step now catches `new Database(…)` failures (file-is-a-directory, unreadable file, header so broken SQLite refuses to attach) and surfaces them as a structured `status: "error"` with `data.openFailed: true` rather than letting the runner's generic "this is a bug" fallback eat it. Tests: 16 unit tests in `repair.test.ts` (11 carried, 5 new — 1 open-failure + 4 backfill: disk-only convo backfills + verifies SQLite rows, idempotency on second run, empty-conversations-dir nothing-to- backfill summary, malformed meta.json surfaced as a warning without erroring the step). Migration 028's 10 tests all still pass against the refactored delegator. Smoke-tested on the live ~4 GB workspace DB: [1/2] integrity-check — ok no corruption detected (40.1s) scanned 993,829 pages [2/2] conversation-backfill — ok nothing to backfill (773 on-disk conversations already present) (1.5s) Done. 2 steps ran: 2 ok, 0 failed real 0m42.483s --- .../cli/commands/db/__tests__/repair.test.ts | 197 +++++++++++- .../db/repair-step-conversation-backfill.ts | 134 ++++++++ .../cli/commands/db/repair-step-integrity.ts | 150 +++++---- assistant/src/cli/commands/db/repair-steps.ts | 39 +-- assistant/src/cli/commands/db/repair.ts | 16 +- ...28-recover-conversations-from-disk-view.ts | 239 +------------- .../recovery/conversations-from-disk.ts | 295 ++++++++++++++++++ 7 files changed, 730 insertions(+), 340 deletions(-) create mode 100644 assistant/src/cli/commands/db/repair-step-conversation-backfill.ts create mode 100644 assistant/src/workspace/recovery/conversations-from-disk.ts diff --git a/assistant/src/cli/commands/db/__tests__/repair.test.ts b/assistant/src/cli/commands/db/__tests__/repair.test.ts index 88103de903d..e54cdd19d36 100644 --- a/assistant/src/cli/commands/db/__tests__/repair.test.ts +++ b/assistant/src/cli/commands/db/__tests__/repair.test.ts @@ -19,6 +19,7 @@ import { mkdtempSync, openSync, rmSync, + writeFileSync, writeSync, } from "node:fs"; import { tmpdir } from "node:os"; @@ -169,21 +170,24 @@ describe("assistant db repair — healthy DB", () => { expect(stdout).toContain("integrity-check"); expect(stdout).toContain("ok"); expect(stdout).toContain("no corruption detected"); - expect(stdout).toMatch(/Done\. 1 step ran: 1 ok, 0 failed/); + expect(stdout).toContain("conversation-backfill"); + expect(stdout).toMatch(/Done\. 2 steps ran: 2 ok, 0 failed/); }); - test("--json emits a structured report with the step result", async () => { + test("--json emits a structured report with all step results", async () => { seedHealthyDb(); const { stdout, exitCode } = await runRepair(["--json", "repair"]); expect(exitCode).toBe(0); const parsed = JSON.parse(stdout); expect(parsed.dbPath).toBe(dbPath); - expect(parsed.steps).toHaveLength(1); + expect(parsed.steps).toHaveLength(2); expect(parsed.steps[0].name).toBe("integrity-check"); expect(parsed.steps[0].result.status).toBe("ok"); expect(parsed.steps[0].result.data.errorCount).toBe(0); expect(typeof parsed.steps[0].result.durationMs).toBe("number"); - expect(parsed.okCount).toBe(1); + expect(parsed.steps[1].name).toBe("conversation-backfill"); + expect(parsed.steps[1].result.status).toBe("ok"); + expect(parsed.okCount).toBe(2); expect(parsed.errorCount).toBe(0); }); }); @@ -203,14 +207,19 @@ describe("assistant db repair — corrupt DB", () => { /(integrity violation|database is too corrupt|database disk image is malformed)/, ); expect(stdout).not.toContain("this is a bug"); - expect(stdout).toMatch(/Done\. 1 step ran: 0 ok, 1 failed/); + // Backfill still attempts to run after integrity check fails (continue + // on non-halting error). On the rollback-journal-mode corrupt seed + // backfill itself runs against an empty conversations dir and reports + // "nothing to backfill", so the summary is `1 ok, 1 failed`. + expect(stdout).toMatch(/Done\. 2 steps ran: 1 ok, 1 failed/); }); - test("--json carries the full error list", async () => { + test("--json carries the full error list from integrity-check", async () => { seedCorruptDb(); const { stdout, exitCode } = await runRepair(["--json", "repair"]); expect(exitCode).toBe(1); const parsed = JSON.parse(stdout); + expect(parsed.steps[0].name).toBe("integrity-check"); expect(parsed.steps[0].result.status).toBe("error"); expect(parsed.steps[0].result.data).toBeDefined(); expect(Array.isArray(parsed.steps[0].result.data.errors)).toBe(true); @@ -353,3 +362,179 @@ describe("repair step runner", () => { expect(report.steps[0].result.durationMs).toBeGreaterThanOrEqual(0); }); }); + +// --------------------------------------------------------------------------- +// Integrity step — open failures +// --------------------------------------------------------------------------- + +describe("integrity-check step — open failures", () => { + test("file-is-a-directory surfaces as a structured error", async () => { + // Make the db path itself a directory, so SQLite can't open it as a + // file. The constructor throws; without our explicit catch the runner + // would mark this as "this is a bug". + rmSync(dbPath, { force: true }); + mkdirSync(dbPath, { recursive: true }); + + const { stdout, exitCode } = await runRepair(["--json", "repair"]); + expect(exitCode).toBe(1); + const parsed = JSON.parse(stdout); + expect(parsed.steps[0].name).toBe("integrity-check"); + expect(parsed.steps[0].result.status).toBe("error"); + expect(parsed.steps[0].result.data.openFailed).toBe(true); + expect(parsed.steps[0].result.summary).toContain("could not open"); + // Crucially: not flagged as a bug. + const allDetail = JSON.stringify(parsed); + expect(allDetail).not.toContain("this is a bug"); + }); +}); + +// --------------------------------------------------------------------------- +// Conversation-backfill step +// --------------------------------------------------------------------------- + +/** + * Seed a `/conversations//` directory pair on disk for the + * backfill step to discover. Returns the conversation id used. + */ +function seedDiskConversation( + opts: { + id?: string; + title?: string; + messages?: Array<{ role: string; content: string; ts?: string }>; + } = {}, +): string { + const id = opts.id ?? "conv-disk-1"; + const dir = join(workspaceDir, "conversations", id); + mkdirSync(dir, { recursive: true }); + writeFileSync( + join(dir, "meta.json"), + JSON.stringify({ + id, + title: opts.title ?? "Backfilled", + createdAt: "2025-01-01T00:00:00.000Z", + updatedAt: "2025-01-01T00:01:00.000Z", + }), + ); + if (opts.messages) { + const lines = opts.messages + .map((m) => + JSON.stringify({ role: m.role, content: m.content, ts: m.ts }), + ) + .join("\n"); + writeFileSync(join(dir, "messages.jsonl"), lines + "\n"); + } + return id; +} + +/** Apply the schema to the test DB so backfill has tables to insert into. */ +async function initSchema(): Promise { + // The repair step opens its own bun:sqlite handle but expects the schema + // to already exist (production-wise, the daemon creates it). Touching the + // global init triggers schema creation against the env-isolated path. + const { initializeDb } = await import("../../../../memory/db-init.js"); + initializeDb(); + // Close the singleton so backfill can open its own handle without + // collision. WAL allows concurrent handles but cleaner ownership avoids + // test cross-talk through the in-process cache. + const { getDb, getSqliteFrom } = + await import("../../../../memory/db-connection.js"); + const { clearStoredDb } = await import("../../../../memory/db-singleton.js"); + try { + getSqliteFrom(getDb()).close(); + } catch { + /* already closed */ + } + clearStoredDb(); +} + +describe("assistant db repair — conversation-backfill step", () => { + test("backfills a disk-only conversation into SQLite", async () => { + await initSchema(); + seedDiskConversation({ + id: "conv-recover-1", + title: "Recover me", + messages: [ + { role: "user", content: "hi", ts: "2025-01-01T00:00:30.000Z" }, + { role: "assistant", content: "hello", ts: "2025-01-01T00:00:31.000Z" }, + ], + }); + + const { stdout, exitCode } = await runRepair(["--json", "repair"]); + expect(exitCode).toBe(0); + const parsed = JSON.parse(stdout); + const backfill = parsed.steps[1]; + expect(backfill.name).toBe("conversation-backfill"); + expect(backfill.result.status).toBe("ok"); + expect(backfill.result.data.recovered).toBe(1); + expect(backfill.result.data.errors).toBe(0); + expect(backfill.result.data.skipped).toBe(0); + + // And confirm it actually landed in SQLite. + const verify = new Database(dbPath, { readonly: true }); + try { + const row = verify + .query< + { id: string; title: string }, + [] + >("SELECT id, title FROM conversations WHERE id = 'conv-recover-1'") + .get(); + expect(row?.id).toBe("conv-recover-1"); + expect(row?.title).toBe("Recover me"); + const msgCount = verify + .query< + { n: number }, + [] + >("SELECT COUNT(*) AS n FROM messages WHERE conversation_id = 'conv-recover-1'") + .get(); + expect(msgCount?.n).toBe(2); + } finally { + verify.close(); + } + }); + + test("skips conversations already present (idempotent)", async () => { + await initSchema(); + seedDiskConversation({ id: "conv-idempotent-1" }); + + // First run: recover. + let { stdout } = await runRepair(["--json", "repair"]); + let parsed = JSON.parse(stdout); + expect(parsed.steps[1].result.data.recovered).toBe(1); + + // Second run: should skip. + ({ stdout } = await runRepair(["--json", "repair"])); + parsed = JSON.parse(stdout); + expect(parsed.steps[1].result.data.recovered).toBe(0); + expect(parsed.steps[1].result.data.skipped).toBe(1); + expect(parsed.steps[1].result.status).toBe("ok"); + }); + + test("reports nothing-to-backfill on an empty conversations dir", async () => { + await initSchema(); + // No seedDiskConversation call. + const { stdout, exitCode } = await runRepair(["--json", "repair"]); + expect(exitCode).toBe(0); + const parsed = JSON.parse(stdout); + expect(parsed.steps[1].result.status).toBe("ok"); + expect(parsed.steps[1].result.summary).toContain("nothing to backfill"); + expect(parsed.steps[1].result.data.recovered).toBe(0); + }); + + test("surfaces warnings for malformed meta.json without erroring the step", async () => { + await initSchema(); + const dir = join(workspaceDir, "conversations", "broken-1"); + mkdirSync(dir, { recursive: true }); + writeFileSync(join(dir, "meta.json"), "{ not valid json"); + + const { stdout, exitCode } = await runRepair(["--json", "repair"]); + expect(exitCode).toBe(0); + const parsed = JSON.parse(stdout); + expect(parsed.steps[1].result.status).toBe("ok"); + expect(parsed.steps[1].result.data.skipped).toBe(1); + expect( + parsed.steps[1].result.data.warnings.some((w: string) => + w.includes("malformed meta.json"), + ), + ).toBe(true); + }); +}); diff --git a/assistant/src/cli/commands/db/repair-step-conversation-backfill.ts b/assistant/src/cli/commands/db/repair-step-conversation-backfill.ts new file mode 100644 index 00000000000..5770f63f258 --- /dev/null +++ b/assistant/src/cli/commands/db/repair-step-conversation-backfill.ts @@ -0,0 +1,134 @@ +/** + * Repair step: conversation backfill from the on-disk view. + * + * Each conversation directory under `/conversations//` holds + * a `meta.json` and `messages.jsonl` written by the runtime as the source + * of truth for the disk view. If the SQLite database was wiped, restored + * from an old backup, or otherwise lost the `conversations`/`messages` + * rows, we can replay the on-disk files to reconstruct them. + * + * The core recovery logic lives in `workspace/recovery/conversations-from-disk.ts` + * and is shared with workspace migration 028 (which runs the same pass at + * startup). This step opens its own read-write bun:sqlite handle so the + * command works when the daemon is down — the whole point of the local + * transport. + * + * Idempotent: existing conversation rows are skipped without modification. + */ + +import { Database } from "bun:sqlite"; + +import { drizzle } from "drizzle-orm/bun-sqlite"; + +import * as schema from "../../../memory/schema.js"; +import { getWorkspaceDir } from "../../../util/platform.js"; +import { recoverConversationsFromDisk } from "../../../workspace/recovery/conversations-from-disk.js"; +import type { RepairContext, RepairStep, StepResult } from "./repair-steps.js"; + +/** + * Cap on warning lines surfaced in the human-mode output. The JSON payload + * carries the full list (subject to `WARNING_CAP_TOTAL`) so scripted callers + * never lose detail. + */ +const MAX_REPORTED_WARNING_LINES = 20; + +/** + * Hard cap on warnings retained in memory, even for the JSON payload. + * Prevents a workspace with thousands of malformed entries from blowing + * memory on the report object. + */ +const WARNING_CAP_TOTAL = 500; + +async function runConversationBackfill( + ctx: RepairContext, +): Promise { + // Open RW so we can insert recovered rows. Mirror the daemon's pragmas + // for consistent journal/FK behavior — anything else risks subtle drift + // between the migration path and this CLI path. + let sqlite: Database; + try { + sqlite = new Database(ctx.dbPath); + sqlite.exec("PRAGMA journal_mode=WAL"); + sqlite.exec("PRAGMA synchronous=FULL"); + sqlite.exec("PRAGMA busy_timeout=5000"); + sqlite.exec("PRAGMA foreign_keys = ON"); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + return { + status: "error", + summary: "could not open database for conversation backfill", + detailLines: [msg], + data: { + recovered: 0, + skipped: 0, + errors: 1, + warnings: [msg], + openFailed: true, + }, + }; + } + + try { + const db = drizzle(sqlite, { schema }); + const workspaceDir = getWorkspaceDir(); + + const result = recoverConversationsFromDisk(workspaceDir, db, { + warningCap: WARNING_CAP_TOTAL, + }); + + const summary = + result.recovered === 0 && result.errors === 0 + ? `nothing to backfill (${result.skipped} on-disk conversation${result.skipped === 1 ? "" : "s"} already present)` + : `recovered ${result.recovered}, skipped ${result.skipped}, ${result.errors} error${result.errors === 1 ? "" : "s"}`; + + const truncatedWarnings = result.warnings.slice( + 0, + MAX_REPORTED_WARNING_LINES, + ); + const detailLines = + result.warnings.length > MAX_REPORTED_WARNING_LINES + ? [ + ...truncatedWarnings, + `+ ${result.warnings.length - MAX_REPORTED_WARNING_LINES} more (use --json for full list)`, + ] + : truncatedWarnings; + + // Errors during insert are surfaced as a non-halting failure so later + // steps still run. Warnings without errors (malformed JSONL lines, + // missing meta.json) are not themselves a failure — they're skips. + if (result.errors > 0) { + return { + status: "error", + summary, + detailLines, + data: { + recovered: result.recovered, + skipped: result.skipped, + errors: result.errors, + warnings: result.warnings, + }, + }; + } + + return { + status: "ok", + summary, + detailLines, + data: { + recovered: result.recovered, + skipped: result.skipped, + errors: result.errors, + warnings: result.warnings, + }, + }; + } finally { + sqlite.close(); + } +} + +export const conversationBackfillStep: RepairStep = { + name: "conversation-backfill", + description: + "Replay workspace/conversations//{meta.json,messages.jsonl} into SQLite", + run: runConversationBackfill, +}; diff --git a/assistant/src/cli/commands/db/repair-step-integrity.ts b/assistant/src/cli/commands/db/repair-step-integrity.ts index 5af127b2774..30e8a16d5e2 100644 --- a/assistant/src/cli/commands/db/repair-step-integrity.ts +++ b/assistant/src/cli/commands/db/repair-step-integrity.ts @@ -15,14 +15,13 @@ * please tell me everything that's wrong". * * The step never mutates the database. If corruption is found the step - * returns a non-halting error — subsequent steps (conversation backfill, - * etc.) may still produce useful work even on a partially-corrupt DB. + * returns a non-halting error — subsequent steps may still produce useful + * work even on a partially-corrupt DB. */ import { Database } from "bun:sqlite"; import type { RepairContext, RepairStep, StepResult } from "./repair-steps.js"; -import { withDb } from "./repair-steps.js"; /** * Maximum number of corruption error lines to surface in the human output @@ -33,76 +32,95 @@ import { withDb } from "./repair-steps.js"; const MAX_REPORTED_ERROR_LINES = 20; async function runIntegrityCheck(ctx: RepairContext): Promise { - return withDb( - () => new Database(ctx.dbPath, { readonly: true }), - async (db) => { - // `PRAGMA integrity_check` returns rows of a single TEXT column also - // named `integrity_check`. When the DB is healthy this is exactly one - // row whose value is the literal "ok"; any other shape means errors. - // - // Severely corrupted DBs (header damaged, b-tree root unreadable) - // can cause the pragma itself to throw "database disk image is - // malformed" before yielding any rows. We catch that and surface it - // as the corruption signal it actually is — it's not a bug in the - // step, it's the DB telling us it's structurally invalid. - let messages: string[]; - try { - const rows = db - .query<{ integrity_check: string }, []>("PRAGMA integrity_check") - .all(); - messages = rows.map((r) => r.integrity_check); - } catch (err) { - const msg = err instanceof Error ? err.message : String(err); - const pageCount = safePageCount(db); - return { - status: "error", - summary: "database is too corrupt to complete integrity check", - detailLines: [ - msg, - `page count: ${pageCount.toLocaleString("en-US")}`, - ], - data: { - pageCount, - errorCount: 1, - errors: [msg], - checkFailed: true, - }, - }; - } + // Open-failure (file unreadable, file is a directory, header so corrupt + // SQLite refuses to attach) needs to surface as a normal step error, not + // get caught by the runner's generic "step threw — this is a bug" + // fallback. Catch the constructor failure and convert it into a + // structured diagnostic. + let db: Database; + try { + db = new Database(ctx.dbPath, { readonly: true }); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + return { + status: "error", + summary: "could not open database for integrity check", + detailLines: [msg], + data: { + pageCount: 0, + errorCount: 1, + errors: [msg], + openFailed: true, + }, + }; + } - const healthy = messages.length === 1 && messages[0] === "ok"; + try { + // `PRAGMA integrity_check` returns rows of a single TEXT column also + // named `integrity_check`. When the DB is healthy this is exactly one + // row whose value is the literal "ok"; any other shape means errors. + // + // Severely corrupted DBs (header damaged, b-tree root unreadable) can + // cause the pragma itself to throw "database disk image is malformed" + // before yielding any rows. Catch that and surface it as the + // corruption signal it actually is — not a step bug, the DB telling + // us it's structurally invalid. + let messages: string[]; + try { + const rows = db + .query<{ integrity_check: string }, []>("PRAGMA integrity_check") + .all(); + messages = rows.map((r) => r.integrity_check); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); const pageCount = safePageCount(db); - - if (healthy) { - return { - status: "ok", - summary: "no corruption detected", - detailLines: [`scanned ${pageCount.toLocaleString("en-US")} pages`], - data: { pageCount, errorCount: 0 }, - }; - } - - const truncated = messages.slice(0, MAX_REPORTED_ERROR_LINES); - const detailLines = - messages.length > MAX_REPORTED_ERROR_LINES - ? [ - ...truncated, - `+ ${messages.length - MAX_REPORTED_ERROR_LINES} more (use --json for full list)`, - ] - : truncated; - return { status: "error", - summary: `${messages.length} integrity violation${messages.length === 1 ? "" : "s"} reported`, - detailLines, + summary: "database is too corrupt to complete integrity check", + detailLines: [msg, `page count: ${pageCount.toLocaleString("en-US")}`], data: { pageCount, - errorCount: messages.length, - errors: messages, + errorCount: 1, + errors: [msg], + checkFailed: true, }, }; - }, - ); + } + + const healthy = messages.length === 1 && messages[0] === "ok"; + const pageCount = safePageCount(db); + + if (healthy) { + return { + status: "ok", + summary: "no corruption detected", + detailLines: [`scanned ${pageCount.toLocaleString("en-US")} pages`], + data: { pageCount, errorCount: 0 }, + }; + } + + const truncated = messages.slice(0, MAX_REPORTED_ERROR_LINES); + const detailLines = + messages.length > MAX_REPORTED_ERROR_LINES + ? [ + ...truncated, + `+ ${messages.length - MAX_REPORTED_ERROR_LINES} more (use --json for full list)`, + ] + : truncated; + + return { + status: "error", + summary: `${messages.length} integrity violation${messages.length === 1 ? "" : "s"} reported`, + detailLines, + data: { + pageCount, + errorCount: messages.length, + errors: messages, + }, + }; + } finally { + db.close(); + } } /** @@ -110,7 +128,7 @@ async function runIntegrityCheck(ctx: RepairContext): Promise { * from the header), but on truly malformed files it can throw too. Wrap * it so the integrity step always has a number to report. */ -function safePageCount(db: import("bun:sqlite").Database): number { +function safePageCount(db: Database): number { try { return ( db.query<{ page_count: number }, []>("PRAGMA page_count").get() diff --git a/assistant/src/cli/commands/db/repair-steps.ts b/assistant/src/cli/commands/db/repair-steps.ts index 03f810fe4b3..50e63954714 100644 --- a/assistant/src/cli/commands/db/repair-steps.ts +++ b/assistant/src/cli/commands/db/repair-steps.ts @@ -1,34 +1,21 @@ /** * Step framework for `assistant db repair`. * - * `repair` is conceptually a sequence of discrete remediation passes: + * `repair` runs a sequence of discrete remediation passes. Each step is a + * `RepairStep` with a `name`, a one-line `description`, and a `run(ctx)` + * that returns a `StepResult`. * - * 1. integrity check (this PR) - * 2. conversation backfill (next PR — replay /workspace/conversations - * into SQLite) - * 3. … more to come (memory consolidation, lost-and-found triage, etc.) + * The runner: + * - executes steps sequentially in the order supplied + * - continues past non-halting failures (a corrupt DB doesn't preclude + * re-deriving conversations from disk) + * - stops only when a step returns `halt: true` + * - never throws — uncaught errors from a step body are captured as a + * synthetic `error` result so a bug in one step can't crash the run * - * Each step is a small unit that: - * - logs a "starting" line when it begins - * - produces a `StepResult` describing what happened - * - logs a single "success" or "error" summary line with details - * - * The runner is intentionally not clever: - * - steps run sequentially (later steps may depend on earlier ones; in - * particular, a step that mutates the DB needs preceding integrity-check - * results to be visible) - * - a failed step does NOT halt the sequence by default. Repair is a - * best-effort surface — a corrupted DB doesn't mean we should skip - * re-deriving conversations from disk. Steps that genuinely cannot - * continue on failure mark themselves `halt: true`. - * - the runner never throws; every error is captured into a `StepResult` - * so callers can render a coherent summary - * - * `RepairContext` holds the per-run state every step shares — the DB path - * and any opened handles. Steps may open their own bun:sqlite connections - * (e.g. integrity check opens read-only) rather than holding one open at - * the context level; future write-side steps will need to open RW - * themselves anyway. + * `RepairContext` holds the per-run state every step shares — currently + * just the DB path. Steps open their own bun:sqlite connections (read-only + * or read-write as needed) rather than sharing a handle through the context. */ import type { Database } from "bun:sqlite"; diff --git a/assistant/src/cli/commands/db/repair.ts b/assistant/src/cli/commands/db/repair.ts index f08127162e3..76a4da27a8e 100644 --- a/assistant/src/cli/commands/db/repair.ts +++ b/assistant/src/cli/commands/db/repair.ts @@ -4,12 +4,10 @@ * Composes the step framework in `repair-steps.ts` with the concrete steps * imported from `repair-step-*.ts` files. Each step logs its own * starting/success/error lines; the runner aggregates results into a - * `RepairReport` that we render either as plain text or as a single JSON + * `RepairReport` that renders either as plain text or as a single JSON * payload (`--json`). * - * This PR ships exactly one step (integrity-check). Subsequent PRs append - * more steps to the `STEPS` array — that's the only edit they need to make - * here. + * Adding a new step is a one-line edit to the `STEPS` array. * * Transport: `local`. The whole point of this surface is that it works when * the daemon is down, so it opens the DB file directly and never goes @@ -23,6 +21,7 @@ import type { Command } from "commander"; import { getDbPath } from "../../../util/platform.js"; import { dim, green, red } from "../../lib/cli-colors.js"; import { shouldOutputJson, writeOutput } from "../../output.js"; +import { conversationBackfillStep } from "./repair-step-conversation-backfill.js"; import { integrityCheckStep } from "./repair-step-integrity.js"; import type { RepairReport, RepairStep, StepResult } from "./repair-steps.js"; import { formatDurationMs, runRepairSteps } from "./repair-steps.js"; @@ -32,13 +31,10 @@ import { formatDurationMs, runRepairSteps } from "./repair-steps.js"; // --------------------------------------------------------------------------- /** - * Repair steps run in the order listed here. Order matters: - * 1. integrity-check FIRST — surfaces structural damage before we attempt - * anything that touches the same pages - * 2. (future) conversation backfill from /workspace/conversations - * 3. (future) memory consolidation, lost-and-found triage, etc. + * Repair steps run in the order listed here. Integrity check runs first so + * structural damage surfaces before subsequent steps touch the same pages. */ -const STEPS: RepairStep[] = [integrityCheckStep]; +const STEPS: RepairStep[] = [integrityCheckStep, conversationBackfillStep]; // --------------------------------------------------------------------------- // Rendering diff --git a/assistant/src/workspace/migrations/028-recover-conversations-from-disk-view.ts b/assistant/src/workspace/migrations/028-recover-conversations-from-disk-view.ts index 6dbf2d18d39..87e3a57c6ec 100644 --- a/assistant/src/workspace/migrations/028-recover-conversations-from-disk-view.ts +++ b/assistant/src/workspace/migrations/028-recover-conversations-from-disk-view.ts @@ -8,253 +8,28 @@ * * Idempotent: conversations already present in the DB are skipped. * Malformed files are skipped with warnings — they do not crash the migration. + * + * Core logic lives in `../recovery/conversations-from-disk.ts` and is also + * invoked by the `assistant db repair` command. */ -import { randomUUID } from "node:crypto"; -import { existsSync, readdirSync, readFileSync, statSync } from "node:fs"; -import { join } from "node:path"; - -import { eq } from "drizzle-orm"; - import { getDb } from "../../memory/db-connection.js"; -import { conversations, messages } from "../../memory/schema/conversations.js"; import { getLogger } from "../../util/logger.js"; +import { recoverConversationsFromDisk } from "../recovery/conversations-from-disk.js"; import type { WorkspaceMigration } from "./types.js"; const log = getLogger("workspace-migrations"); -interface DiskMeta { - id: string; - title?: string; - type?: string; - channel?: string; - createdAt?: string; - updatedAt?: string; -} - -interface DiskToolCall { - name?: string; - input?: unknown; -} - -interface DiskToolResult { - content?: unknown; -} - -interface DiskMessageRecord { - role: string; - ts?: string; - content?: string; - toolCalls?: DiskToolCall[]; - toolResults?: DiskToolResult[]; - attachments?: unknown[]; -} - -function parseEpochMs(isoString: string | undefined): number | null { - if (!isoString) return null; - const ms = new Date(isoString).getTime(); - return Number.isNaN(ms) ? null : ms; -} - -function buildContentBlocks(record: DiskMessageRecord): unknown[] { - const blocks: unknown[] = []; - - if (record.content) { - blocks.push({ type: "text", text: record.content }); - } - - if (Array.isArray(record.toolCalls)) { - for (const tc of record.toolCalls) { - blocks.push({ - type: "tool_use", - id: randomUUID(), - name: tc.name ?? "unknown", - input: tc.input ?? {}, - }); - } - } - - if (Array.isArray(record.toolResults)) { - for (const tr of record.toolResults) { - blocks.push({ - type: "tool_result", - tool_use_id: "", - content: - typeof tr.content === "string" - ? tr.content - : JSON.stringify(tr.content), - }); - } - } - - // content column is NOT NULL — ensure at least one block - if (blocks.length === 0) { - blocks.push({ type: "text", text: "" }); - } - - return blocks; -} - export const recoverConversationsFromDiskViewMigration: WorkspaceMigration = { id: "028-recover-conversations-from-disk-view", description: "Recover conversations from disk-view directories into the database", run(workspaceDir: string): void { - const conversationsDir = join(workspaceDir, "conversations"); - if (!existsSync(conversationsDir)) return; - - const db = getDb(); + const { recovered, skipped, errors, warnings } = + recoverConversationsFromDisk(workspaceDir, getDb()); - let entries: string[]; - try { - entries = readdirSync(conversationsDir); - } catch (err) { - log.warn(`Failed to read conversations directory: ${err}`); - return; - } - - let recovered = 0; - let skipped = 0; - let errors = 0; - - for (const entry of entries) { - const dirPath = join(conversationsDir, entry); - - // Skip non-directories - try { - if (!statSync(dirPath).isDirectory()) { - continue; - } - } catch { - continue; - } - - // Read and parse meta.json - const metaPath = join(dirPath, "meta.json"); - if (!existsSync(metaPath)) { - log.warn( - `Skipping ${entry}: missing meta.json`, - ); - skipped++; - continue; - } - - let meta: DiskMeta; - try { - meta = JSON.parse(readFileSync(metaPath, "utf-8")) as DiskMeta; - } catch (err) { - log.warn( - `Skipping ${entry}: malformed meta.json: ${err}`, - ); - skipped++; - continue; - } - - if (!meta.id) { - log.warn( - `Skipping ${entry}: meta.json missing id`, - ); - skipped++; - continue; - } - - // Check if conversation already exists in DB (idempotency) - const existing = db - .select() - .from(conversations) - .where(eq(conversations.id, meta.id)) - .get(); - - if (existing) { - skipped++; - continue; - } - - // Parse messages.jsonl - const messagesPath = join(dirPath, "messages.jsonl"); - const messageRecords: DiskMessageRecord[] = []; - - if (existsSync(messagesPath)) { - try { - const raw = readFileSync(messagesPath, "utf-8"); - for (const line of raw.split("\n")) { - const trimmed = line.trim(); - if (!trimmed) continue; - try { - messageRecords.push( - JSON.parse(trimmed) as DiskMessageRecord, - ); - } catch { - log.warn( - `Skipping malformed JSONL line in ${entry}/messages.jsonl`, - ); - } - } - } catch (err) { - log.warn( - `Failed to read messages.jsonl for ${entry}: ${err}`, - ); - } - } - - // Compute timestamps - const createdAt = parseEpochMs(meta.createdAt) ?? Date.now(); - const updatedAt = parseEpochMs(meta.updatedAt) ?? createdAt; - - // Insert conversation + messages in a transaction - try { - db.transaction((tx) => { - tx.insert(conversations) - .values({ - id: meta.id, - title: meta.title ?? null, - createdAt, - updatedAt, - conversationType: meta.type ?? "standard", - originChannel: meta.channel ?? null, - source: "user", - memoryScopeId: "default", - isAutoTitle: 1, - totalInputTokens: 0, - totalOutputTokens: 0, - totalEstimatedCost: 0, - contextSummary: null, - contextCompactedMessageCount: 0, - contextCompactedAt: null, - originInterface: null, - forkParentConversationId: null, - forkParentMessageId: null, - scheduleJobId: null, - }) - .run(); - - for (const record of messageRecords) { - const contentBlocks = buildContentBlocks(record); - const msgCreatedAt = - parseEpochMs(record.ts) ?? createdAt; - - tx.insert(messages) - .values({ - id: randomUUID(), - conversationId: meta.id, - role: record.role, - content: JSON.stringify(contentBlocks), - createdAt: msgCreatedAt, - metadata: null, - }) - .run(); - } - }); - - recovered++; - } catch (err) { - log.warn( - `Failed to insert conversation ${meta.id} (${entry}): ${err}`, - ); - errors++; - } - } + for (const w of warnings) log.warn(w); if (recovered > 0 || errors > 0) { log.info( diff --git a/assistant/src/workspace/recovery/conversations-from-disk.ts b/assistant/src/workspace/recovery/conversations-from-disk.ts new file mode 100644 index 00000000000..606a2d3b757 --- /dev/null +++ b/assistant/src/workspace/recovery/conversations-from-disk.ts @@ -0,0 +1,295 @@ +/** + * Recover conversations from the on-disk view directories under + * `workspace/conversations//`. + * + * Each conversation directory holds: + * - `meta.json` — `{ id, title?, type?, channel?, createdAt?, updatedAt? }` + * - `messages.jsonl` — one JSON record per line, `{ role, ts?, content?, + * toolCalls?, toolResults?, attachments? }` + * + * Replaying these into SQLite reconstructs the conversation table after a + * database wipe. The function is idempotent: conversations whose id already + * exists in the DB are skipped without modification. + * + * Used by: + * - workspace migration 028 (one-shot at startup against `getDb()`) + * - `assistant db repair` conversation-backfill step (any time, against + * a connection the command opens itself) + * + * The caller supplies the drizzle instance so this function makes no + * assumptions about which database connection or workspace it's operating + * on — useful for tests, CLI tools that open their own handle, and the + * normal startup migration path. + */ + +import { randomUUID } from "node:crypto"; +import { existsSync, readdirSync, readFileSync, statSync } from "node:fs"; +import { join } from "node:path"; + +import { eq } from "drizzle-orm"; + +import type { DrizzleDb } from "../../memory/db-connection.js"; +import { conversations, messages } from "../../memory/schema/conversations.js"; + +// --------------------------------------------------------------------------- +// On-disk record shapes +// --------------------------------------------------------------------------- + +interface DiskMeta { + id: string; + title?: string; + type?: string; + channel?: string; + createdAt?: string; + updatedAt?: string; +} + +interface DiskToolCall { + name?: string; + input?: unknown; +} + +interface DiskToolResult { + content?: unknown; +} + +interface DiskMessageRecord { + role: string; + ts?: string; + content?: string; + toolCalls?: DiskToolCall[]; + toolResults?: DiskToolResult[]; + attachments?: unknown[]; +} + +// --------------------------------------------------------------------------- +// Result shape +// --------------------------------------------------------------------------- + +export interface RecoveryResult { + /** Conversations newly inserted into the DB. */ + recovered: number; + /** Conversations skipped (already present, or unreadable on disk). */ + skipped: number; + /** Conversations that matched everything but failed to insert. */ + errors: number; + /** + * Human-readable warning lines, one per skip or error reason. Bounded + * by `warningCap` if provided (default unbounded). Callers that surface + * these to a user can cap further on the human-render side. + */ + warnings: string[]; +} + +export interface RecoveryOptions { + /** Cap the number of warning lines retained. Default: unbounded. */ + warningCap?: number; +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function parseEpochMs(isoString: string | undefined): number | null { + if (!isoString) return null; + const ms = new Date(isoString).getTime(); + return Number.isNaN(ms) ? null : ms; +} + +function buildContentBlocks(record: DiskMessageRecord): unknown[] { + const blocks: unknown[] = []; + + if (record.content) { + blocks.push({ type: "text", text: record.content }); + } + + if (Array.isArray(record.toolCalls)) { + for (const tc of record.toolCalls) { + blocks.push({ + type: "tool_use", + id: randomUUID(), + name: tc.name ?? "unknown", + input: tc.input ?? {}, + }); + } + } + + if (Array.isArray(record.toolResults)) { + for (const tr of record.toolResults) { + blocks.push({ + type: "tool_result", + tool_use_id: "", + content: + typeof tr.content === "string" + ? tr.content + : JSON.stringify(tr.content), + }); + } + } + + // content column is NOT NULL — ensure at least one block + if (blocks.length === 0) { + blocks.push({ type: "text", text: "" }); + } + + return blocks; +} + +// --------------------------------------------------------------------------- +// Public entry point +// --------------------------------------------------------------------------- + +/** + * Walk every directory under `/conversations/` and replay the + * meta + messages into `db`. Returns counters and per-entry warnings. + * + * If the conversations directory does not exist, returns zeros with no + * warnings (it's a valid empty state — the workspace just hasn't recorded + * any conversations yet). + * + * Safe to call concurrently with a running daemon; the per-conversation + * existence check is the idempotency guard. + */ +export function recoverConversationsFromDisk( + workspaceDir: string, + db: DrizzleDb, + opts: RecoveryOptions = {}, +): RecoveryResult { + const result: RecoveryResult = { + recovered: 0, + skipped: 0, + errors: 0, + warnings: [], + }; + const warningCap = opts.warningCap ?? Number.POSITIVE_INFINITY; + + const pushWarning = (line: string): void => { + if (result.warnings.length < warningCap) result.warnings.push(line); + }; + + const conversationsDir = join(workspaceDir, "conversations"); + if (!existsSync(conversationsDir)) return result; + + let entries: string[]; + try { + entries = readdirSync(conversationsDir); + } catch (err) { + pushWarning(`failed to read conversations directory: ${String(err)}`); + return result; + } + + for (const entry of entries) { + const dirPath = join(conversationsDir, entry); + + try { + if (!statSync(dirPath).isDirectory()) continue; + } catch { + continue; + } + + const metaPath = join(dirPath, "meta.json"); + if (!existsSync(metaPath)) { + pushWarning(`${entry}: missing meta.json`); + result.skipped++; + continue; + } + + let meta: DiskMeta; + try { + meta = JSON.parse(readFileSync(metaPath, "utf-8")) as DiskMeta; + } catch (err) { + pushWarning(`${entry}: malformed meta.json: ${String(err)}`); + result.skipped++; + continue; + } + + if (!meta.id) { + pushWarning(`${entry}: meta.json missing id`); + result.skipped++; + continue; + } + + const existing = db + .select() + .from(conversations) + .where(eq(conversations.id, meta.id)) + .get(); + + if (existing) { + result.skipped++; + continue; + } + + const messageRecords: DiskMessageRecord[] = []; + const messagesPath = join(dirPath, "messages.jsonl"); + if (existsSync(messagesPath)) { + try { + const raw = readFileSync(messagesPath, "utf-8"); + for (const line of raw.split("\n")) { + const trimmed = line.trim(); + if (!trimmed) continue; + try { + messageRecords.push(JSON.parse(trimmed) as DiskMessageRecord); + } catch { + pushWarning(`${entry}: malformed JSONL line in messages.jsonl`); + } + } + } catch (err) { + pushWarning(`${entry}: failed to read messages.jsonl: ${String(err)}`); + } + } + + const createdAt = parseEpochMs(meta.createdAt) ?? Date.now(); + const updatedAt = parseEpochMs(meta.updatedAt) ?? createdAt; + + try { + db.transaction((tx) => { + tx.insert(conversations) + .values({ + id: meta.id, + title: meta.title ?? null, + createdAt, + updatedAt, + conversationType: meta.type ?? "standard", + originChannel: meta.channel ?? null, + source: "user", + memoryScopeId: "default", + isAutoTitle: 1, + totalInputTokens: 0, + totalOutputTokens: 0, + totalEstimatedCost: 0, + contextSummary: null, + contextCompactedMessageCount: 0, + contextCompactedAt: null, + originInterface: null, + forkParentConversationId: null, + forkParentMessageId: null, + scheduleJobId: null, + }) + .run(); + + for (const record of messageRecords) { + const contentBlocks = buildContentBlocks(record); + const msgCreatedAt = parseEpochMs(record.ts) ?? createdAt; + + tx.insert(messages) + .values({ + id: randomUUID(), + conversationId: meta.id, + role: record.role, + content: JSON.stringify(contentBlocks), + createdAt: msgCreatedAt, + metadata: null, + }) + .run(); + } + }); + result.recovered++; + } catch (err) { + pushWarning(`${meta.id} (${entry}): insert failed: ${String(err)}`); + result.errors++; + } + } + + return result; +} From 8d446eaa2369ea8d37afcf8c0a3d5b16af970c6a Mon Sep 17 00:00:00 2001 From: "vellum-apollo-bot[bot]" <242025090+vellum-apollo-bot[bot]@users.noreply.github.com> Date: Sat, 30 May 2026 17:15:16 +0000 Subject: [PATCH 2/2] refactor: don't share recovery logic between migration 028 and db repair MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reverts the `workspace/recovery/conversations-from-disk.ts` shared module + the migration 028 delegator collapse. Migration 028 is back to its original 271-line form (unchanged from main); the repair step gets its own self-contained copy inlined into `repair-step-conversation-backfill.ts`. Migrations are frozen historical snapshots. Sharing live code between a migration and an evolving CLI command risks changing the migration's behavior on workspaces that have already run it. The two consumers should be free to drift — bug fixes or schema changes in the repair step shouldn't retroactively alter what migration 028 does. --- .../db/repair-step-conversation-backfill.ts | 281 ++++++++++++++--- ...28-recover-conversations-from-disk-view.ts | 226 +++++++++++++- .../recovery/conversations-from-disk.ts | 295 ------------------ 3 files changed, 465 insertions(+), 337 deletions(-) delete mode 100644 assistant/src/workspace/recovery/conversations-from-disk.ts diff --git a/assistant/src/cli/commands/db/repair-step-conversation-backfill.ts b/assistant/src/cli/commands/db/repair-step-conversation-backfill.ts index 5770f63f258..f2778ec1d53 100644 --- a/assistant/src/cli/commands/db/repair-step-conversation-backfill.ts +++ b/assistant/src/cli/commands/db/repair-step-conversation-backfill.ts @@ -5,24 +5,35 @@ * a `meta.json` and `messages.jsonl` written by the runtime as the source * of truth for the disk view. If the SQLite database was wiped, restored * from an old backup, or otherwise lost the `conversations`/`messages` - * rows, we can replay the on-disk files to reconstruct them. + * rows, this step replays the on-disk files to reconstruct them. * - * The core recovery logic lives in `workspace/recovery/conversations-from-disk.ts` - * and is shared with workspace migration 028 (which runs the same pass at - * startup). This step opens its own read-write bun:sqlite handle so the - * command works when the daemon is down — the whole point of the local - * transport. + * Workspace migration 028 performs the same kind of recovery at startup, + * but its body is a frozen snapshot that runs against historical + * workspaces. This step owns its own copy so the live `db repair` surface + * can evolve independently — bug fixes, new edge cases, or schema changes + * don't risk altering migration 028's behavior on workspaces that have + * already run it. + * + * This step opens its own read-write bun:sqlite handle so the command + * works when the daemon is down — the whole point of the local transport. * * Idempotent: existing conversation rows are skipped without modification. */ +import { randomUUID } from "node:crypto"; +import { existsSync, readdirSync, readFileSync, statSync } from "node:fs"; +import { join } from "node:path"; import { Database } from "bun:sqlite"; +import { eq } from "drizzle-orm"; import { drizzle } from "drizzle-orm/bun-sqlite"; import * as schema from "../../../memory/schema.js"; +import { + conversations, + messages, +} from "../../../memory/schema/conversations.js"; import { getWorkspaceDir } from "../../../util/platform.js"; -import { recoverConversationsFromDisk } from "../../../workspace/recovery/conversations-from-disk.js"; import type { RepairContext, RepairStep, StepResult } from "./repair-steps.js"; /** @@ -39,12 +50,95 @@ const MAX_REPORTED_WARNING_LINES = 20; */ const WARNING_CAP_TOTAL = 500; +// --------------------------------------------------------------------------- +// On-disk record shapes +// --------------------------------------------------------------------------- + +interface DiskMeta { + id: string; + title?: string; + type?: string; + channel?: string; + createdAt?: string; + updatedAt?: string; +} + +interface DiskToolCall { + name?: string; + input?: unknown; +} + +interface DiskToolResult { + content?: unknown; +} + +interface DiskMessageRecord { + role: string; + ts?: string; + content?: string; + toolCalls?: DiskToolCall[]; + toolResults?: DiskToolResult[]; + attachments?: unknown[]; +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function parseEpochMs(isoString: string | undefined): number | null { + if (!isoString) return null; + const ms = new Date(isoString).getTime(); + return Number.isNaN(ms) ? null : ms; +} + +function buildContentBlocks(record: DiskMessageRecord): unknown[] { + const blocks: unknown[] = []; + + if (record.content) { + blocks.push({ type: "text", text: record.content }); + } + + if (Array.isArray(record.toolCalls)) { + for (const tc of record.toolCalls) { + blocks.push({ + type: "tool_use", + id: randomUUID(), + name: tc.name ?? "unknown", + input: tc.input ?? {}, + }); + } + } + + if (Array.isArray(record.toolResults)) { + for (const tr of record.toolResults) { + blocks.push({ + type: "tool_result", + tool_use_id: "", + content: + typeof tr.content === "string" + ? tr.content + : JSON.stringify(tr.content), + }); + } + } + + // content column is NOT NULL — ensure at least one block + if (blocks.length === 0) { + blocks.push({ type: "text", text: "" }); + } + + return blocks; +} + +// --------------------------------------------------------------------------- +// Step body +// --------------------------------------------------------------------------- + async function runConversationBackfill( ctx: RepairContext, ): Promise { // Open RW so we can insert recovered rows. Mirror the daemon's pragmas - // for consistent journal/FK behavior — anything else risks subtle drift - // between the migration path and this CLI path. + // for consistent journal/FK behavior. let sqlite: Database; try { sqlite = new Database(ctx.dbPath); @@ -72,41 +166,163 @@ async function runConversationBackfill( const db = drizzle(sqlite, { schema }); const workspaceDir = getWorkspaceDir(); - const result = recoverConversationsFromDisk(workspaceDir, db, { - warningCap: WARNING_CAP_TOTAL, - }); + let recovered = 0; + let skipped = 0; + let errors = 0; + const warnings: string[] = []; + const pushWarning = (line: string): void => { + if (warnings.length < WARNING_CAP_TOTAL) warnings.push(line); + }; + + const conversationsDir = join(workspaceDir, "conversations"); + if (existsSync(conversationsDir)) { + let entries: string[]; + try { + entries = readdirSync(conversationsDir); + } catch (err) { + pushWarning(`failed to read conversations directory: ${String(err)}`); + entries = []; + } + + for (const entry of entries) { + const dirPath = join(conversationsDir, entry); + + try { + if (!statSync(dirPath).isDirectory()) continue; + } catch { + continue; + } + + const metaPath = join(dirPath, "meta.json"); + if (!existsSync(metaPath)) { + pushWarning(`${entry}: missing meta.json`); + skipped++; + continue; + } + + let meta: DiskMeta; + try { + meta = JSON.parse(readFileSync(metaPath, "utf-8")) as DiskMeta; + } catch (err) { + pushWarning(`${entry}: malformed meta.json: ${String(err)}`); + skipped++; + continue; + } + + if (!meta.id) { + pushWarning(`${entry}: meta.json missing id`); + skipped++; + continue; + } + + const existing = db + .select() + .from(conversations) + .where(eq(conversations.id, meta.id)) + .get(); + + if (existing) { + skipped++; + continue; + } + + const messageRecords: DiskMessageRecord[] = []; + const messagesPath = join(dirPath, "messages.jsonl"); + if (existsSync(messagesPath)) { + try { + const raw = readFileSync(messagesPath, "utf-8"); + for (const line of raw.split("\n")) { + const trimmed = line.trim(); + if (!trimmed) continue; + try { + messageRecords.push(JSON.parse(trimmed) as DiskMessageRecord); + } catch { + pushWarning(`${entry}: malformed JSONL line in messages.jsonl`); + } + } + } catch (err) { + pushWarning( + `${entry}: failed to read messages.jsonl: ${String(err)}`, + ); + } + } + + const createdAt = parseEpochMs(meta.createdAt) ?? Date.now(); + const updatedAt = parseEpochMs(meta.updatedAt) ?? createdAt; + + try { + db.transaction((tx) => { + tx.insert(conversations) + .values({ + id: meta.id, + title: meta.title ?? null, + createdAt, + updatedAt, + conversationType: meta.type ?? "standard", + originChannel: meta.channel ?? null, + source: "user", + memoryScopeId: "default", + isAutoTitle: 1, + totalInputTokens: 0, + totalOutputTokens: 0, + totalEstimatedCost: 0, + contextSummary: null, + contextCompactedMessageCount: 0, + contextCompactedAt: null, + originInterface: null, + forkParentConversationId: null, + forkParentMessageId: null, + scheduleJobId: null, + }) + .run(); + + for (const record of messageRecords) { + const contentBlocks = buildContentBlocks(record); + const msgCreatedAt = parseEpochMs(record.ts) ?? createdAt; + + tx.insert(messages) + .values({ + id: randomUUID(), + conversationId: meta.id, + role: record.role, + content: JSON.stringify(contentBlocks), + createdAt: msgCreatedAt, + metadata: null, + }) + .run(); + } + }); + recovered++; + } catch (err) { + pushWarning(`${meta.id} (${entry}): insert failed: ${String(err)}`); + errors++; + } + } + } const summary = - result.recovered === 0 && result.errors === 0 - ? `nothing to backfill (${result.skipped} on-disk conversation${result.skipped === 1 ? "" : "s"} already present)` - : `recovered ${result.recovered}, skipped ${result.skipped}, ${result.errors} error${result.errors === 1 ? "" : "s"}`; - - const truncatedWarnings = result.warnings.slice( - 0, - MAX_REPORTED_WARNING_LINES, - ); + recovered === 0 && errors === 0 + ? `nothing to backfill (${skipped} on-disk conversation${skipped === 1 ? "" : "s"} already present)` + : `recovered ${recovered}, skipped ${skipped}, ${errors} error${errors === 1 ? "" : "s"}`; + + const truncatedWarnings = warnings.slice(0, MAX_REPORTED_WARNING_LINES); const detailLines = - result.warnings.length > MAX_REPORTED_WARNING_LINES + warnings.length > MAX_REPORTED_WARNING_LINES ? [ ...truncatedWarnings, - `+ ${result.warnings.length - MAX_REPORTED_WARNING_LINES} more (use --json for full list)`, + `+ ${warnings.length - MAX_REPORTED_WARNING_LINES} more (use --json for full list)`, ] : truncatedWarnings; // Errors during insert are surfaced as a non-halting failure so later // steps still run. Warnings without errors (malformed JSONL lines, // missing meta.json) are not themselves a failure — they're skips. - if (result.errors > 0) { + if (errors > 0) { return { status: "error", summary, detailLines, - data: { - recovered: result.recovered, - skipped: result.skipped, - errors: result.errors, - warnings: result.warnings, - }, + data: { recovered, skipped, errors, warnings }, }; } @@ -114,12 +330,7 @@ async function runConversationBackfill( status: "ok", summary, detailLines, - data: { - recovered: result.recovered, - skipped: result.skipped, - errors: result.errors, - warnings: result.warnings, - }, + data: { recovered, skipped, errors, warnings }, }; } finally { sqlite.close(); diff --git a/assistant/src/workspace/migrations/028-recover-conversations-from-disk-view.ts b/assistant/src/workspace/migrations/028-recover-conversations-from-disk-view.ts index 87e3a57c6ec..257c906bdf5 100644 --- a/assistant/src/workspace/migrations/028-recover-conversations-from-disk-view.ts +++ b/assistant/src/workspace/migrations/028-recover-conversations-from-disk-view.ts @@ -8,28 +8,240 @@ * * Idempotent: conversations already present in the DB are skipped. * Malformed files are skipped with warnings — they do not crash the migration. - * - * Core logic lives in `../recovery/conversations-from-disk.ts` and is also - * invoked by the `assistant db repair` command. */ +import { randomUUID } from "node:crypto"; +import { existsSync, readdirSync, readFileSync, statSync } from "node:fs"; +import { join } from "node:path"; + +import { eq } from "drizzle-orm"; + import { getDb } from "../../memory/db-connection.js"; +import { conversations, messages } from "../../memory/schema/conversations.js"; import { getLogger } from "../../util/logger.js"; -import { recoverConversationsFromDisk } from "../recovery/conversations-from-disk.js"; import type { WorkspaceMigration } from "./types.js"; const log = getLogger("workspace-migrations"); +interface DiskMeta { + id: string; + title?: string; + type?: string; + channel?: string; + createdAt?: string; + updatedAt?: string; +} + +interface DiskToolCall { + name?: string; + input?: unknown; +} + +interface DiskToolResult { + content?: unknown; +} + +interface DiskMessageRecord { + role: string; + ts?: string; + content?: string; + toolCalls?: DiskToolCall[]; + toolResults?: DiskToolResult[]; + attachments?: unknown[]; +} + +function parseEpochMs(isoString: string | undefined): number | null { + if (!isoString) return null; + const ms = new Date(isoString).getTime(); + return Number.isNaN(ms) ? null : ms; +} + +function buildContentBlocks(record: DiskMessageRecord): unknown[] { + const blocks: unknown[] = []; + + if (record.content) { + blocks.push({ type: "text", text: record.content }); + } + + if (Array.isArray(record.toolCalls)) { + for (const tc of record.toolCalls) { + blocks.push({ + type: "tool_use", + id: randomUUID(), + name: tc.name ?? "unknown", + input: tc.input ?? {}, + }); + } + } + + if (Array.isArray(record.toolResults)) { + for (const tr of record.toolResults) { + blocks.push({ + type: "tool_result", + tool_use_id: "", + content: + typeof tr.content === "string" + ? tr.content + : JSON.stringify(tr.content), + }); + } + } + + // content column is NOT NULL — ensure at least one block + if (blocks.length === 0) { + blocks.push({ type: "text", text: "" }); + } + + return blocks; +} + export const recoverConversationsFromDiskViewMigration: WorkspaceMigration = { id: "028-recover-conversations-from-disk-view", description: "Recover conversations from disk-view directories into the database", run(workspaceDir: string): void { - const { recovered, skipped, errors, warnings } = - recoverConversationsFromDisk(workspaceDir, getDb()); + const conversationsDir = join(workspaceDir, "conversations"); + if (!existsSync(conversationsDir)) return; + + const db = getDb(); - for (const w of warnings) log.warn(w); + let entries: string[]; + try { + entries = readdirSync(conversationsDir); + } catch (err) { + log.warn(`Failed to read conversations directory: ${err}`); + return; + } + + let recovered = 0; + let skipped = 0; + let errors = 0; + + for (const entry of entries) { + const dirPath = join(conversationsDir, entry); + + // Skip non-directories + try { + if (!statSync(dirPath).isDirectory()) { + continue; + } + } catch { + continue; + } + + // Read and parse meta.json + const metaPath = join(dirPath, "meta.json"); + if (!existsSync(metaPath)) { + log.warn(`Skipping ${entry}: missing meta.json`); + skipped++; + continue; + } + + let meta: DiskMeta; + try { + meta = JSON.parse(readFileSync(metaPath, "utf-8")) as DiskMeta; + } catch (err) { + log.warn(`Skipping ${entry}: malformed meta.json: ${err}`); + skipped++; + continue; + } + + if (!meta.id) { + log.warn(`Skipping ${entry}: meta.json missing id`); + skipped++; + continue; + } + + // Check if conversation already exists in DB (idempotency) + const existing = db + .select() + .from(conversations) + .where(eq(conversations.id, meta.id)) + .get(); + + if (existing) { + skipped++; + continue; + } + + // Parse messages.jsonl + const messagesPath = join(dirPath, "messages.jsonl"); + const messageRecords: DiskMessageRecord[] = []; + + if (existsSync(messagesPath)) { + try { + const raw = readFileSync(messagesPath, "utf-8"); + for (const line of raw.split("\n")) { + const trimmed = line.trim(); + if (!trimmed) continue; + try { + messageRecords.push(JSON.parse(trimmed) as DiskMessageRecord); + } catch { + log.warn( + `Skipping malformed JSONL line in ${entry}/messages.jsonl`, + ); + } + } + } catch (err) { + log.warn(`Failed to read messages.jsonl for ${entry}: ${err}`); + } + } + + // Compute timestamps + const createdAt = parseEpochMs(meta.createdAt) ?? Date.now(); + const updatedAt = parseEpochMs(meta.updatedAt) ?? createdAt; + + // Insert conversation + messages in a transaction + try { + db.transaction((tx) => { + tx.insert(conversations) + .values({ + id: meta.id, + title: meta.title ?? null, + createdAt, + updatedAt, + conversationType: meta.type ?? "standard", + originChannel: meta.channel ?? null, + source: "user", + memoryScopeId: "default", + isAutoTitle: 1, + totalInputTokens: 0, + totalOutputTokens: 0, + totalEstimatedCost: 0, + contextSummary: null, + contextCompactedMessageCount: 0, + contextCompactedAt: null, + originInterface: null, + forkParentConversationId: null, + forkParentMessageId: null, + scheduleJobId: null, + }) + .run(); + + for (const record of messageRecords) { + const contentBlocks = buildContentBlocks(record); + const msgCreatedAt = parseEpochMs(record.ts) ?? createdAt; + + tx.insert(messages) + .values({ + id: randomUUID(), + conversationId: meta.id, + role: record.role, + content: JSON.stringify(contentBlocks), + createdAt: msgCreatedAt, + metadata: null, + }) + .run(); + } + }); + + recovered++; + } catch (err) { + log.warn(`Failed to insert conversation ${meta.id} (${entry}): ${err}`); + errors++; + } + } if (recovered > 0 || errors > 0) { log.info( diff --git a/assistant/src/workspace/recovery/conversations-from-disk.ts b/assistant/src/workspace/recovery/conversations-from-disk.ts deleted file mode 100644 index 606a2d3b757..00000000000 --- a/assistant/src/workspace/recovery/conversations-from-disk.ts +++ /dev/null @@ -1,295 +0,0 @@ -/** - * Recover conversations from the on-disk view directories under - * `workspace/conversations//`. - * - * Each conversation directory holds: - * - `meta.json` — `{ id, title?, type?, channel?, createdAt?, updatedAt? }` - * - `messages.jsonl` — one JSON record per line, `{ role, ts?, content?, - * toolCalls?, toolResults?, attachments? }` - * - * Replaying these into SQLite reconstructs the conversation table after a - * database wipe. The function is idempotent: conversations whose id already - * exists in the DB are skipped without modification. - * - * Used by: - * - workspace migration 028 (one-shot at startup against `getDb()`) - * - `assistant db repair` conversation-backfill step (any time, against - * a connection the command opens itself) - * - * The caller supplies the drizzle instance so this function makes no - * assumptions about which database connection or workspace it's operating - * on — useful for tests, CLI tools that open their own handle, and the - * normal startup migration path. - */ - -import { randomUUID } from "node:crypto"; -import { existsSync, readdirSync, readFileSync, statSync } from "node:fs"; -import { join } from "node:path"; - -import { eq } from "drizzle-orm"; - -import type { DrizzleDb } from "../../memory/db-connection.js"; -import { conversations, messages } from "../../memory/schema/conversations.js"; - -// --------------------------------------------------------------------------- -// On-disk record shapes -// --------------------------------------------------------------------------- - -interface DiskMeta { - id: string; - title?: string; - type?: string; - channel?: string; - createdAt?: string; - updatedAt?: string; -} - -interface DiskToolCall { - name?: string; - input?: unknown; -} - -interface DiskToolResult { - content?: unknown; -} - -interface DiskMessageRecord { - role: string; - ts?: string; - content?: string; - toolCalls?: DiskToolCall[]; - toolResults?: DiskToolResult[]; - attachments?: unknown[]; -} - -// --------------------------------------------------------------------------- -// Result shape -// --------------------------------------------------------------------------- - -export interface RecoveryResult { - /** Conversations newly inserted into the DB. */ - recovered: number; - /** Conversations skipped (already present, or unreadable on disk). */ - skipped: number; - /** Conversations that matched everything but failed to insert. */ - errors: number; - /** - * Human-readable warning lines, one per skip or error reason. Bounded - * by `warningCap` if provided (default unbounded). Callers that surface - * these to a user can cap further on the human-render side. - */ - warnings: string[]; -} - -export interface RecoveryOptions { - /** Cap the number of warning lines retained. Default: unbounded. */ - warningCap?: number; -} - -// --------------------------------------------------------------------------- -// Helpers -// --------------------------------------------------------------------------- - -function parseEpochMs(isoString: string | undefined): number | null { - if (!isoString) return null; - const ms = new Date(isoString).getTime(); - return Number.isNaN(ms) ? null : ms; -} - -function buildContentBlocks(record: DiskMessageRecord): unknown[] { - const blocks: unknown[] = []; - - if (record.content) { - blocks.push({ type: "text", text: record.content }); - } - - if (Array.isArray(record.toolCalls)) { - for (const tc of record.toolCalls) { - blocks.push({ - type: "tool_use", - id: randomUUID(), - name: tc.name ?? "unknown", - input: tc.input ?? {}, - }); - } - } - - if (Array.isArray(record.toolResults)) { - for (const tr of record.toolResults) { - blocks.push({ - type: "tool_result", - tool_use_id: "", - content: - typeof tr.content === "string" - ? tr.content - : JSON.stringify(tr.content), - }); - } - } - - // content column is NOT NULL — ensure at least one block - if (blocks.length === 0) { - blocks.push({ type: "text", text: "" }); - } - - return blocks; -} - -// --------------------------------------------------------------------------- -// Public entry point -// --------------------------------------------------------------------------- - -/** - * Walk every directory under `/conversations/` and replay the - * meta + messages into `db`. Returns counters and per-entry warnings. - * - * If the conversations directory does not exist, returns zeros with no - * warnings (it's a valid empty state — the workspace just hasn't recorded - * any conversations yet). - * - * Safe to call concurrently with a running daemon; the per-conversation - * existence check is the idempotency guard. - */ -export function recoverConversationsFromDisk( - workspaceDir: string, - db: DrizzleDb, - opts: RecoveryOptions = {}, -): RecoveryResult { - const result: RecoveryResult = { - recovered: 0, - skipped: 0, - errors: 0, - warnings: [], - }; - const warningCap = opts.warningCap ?? Number.POSITIVE_INFINITY; - - const pushWarning = (line: string): void => { - if (result.warnings.length < warningCap) result.warnings.push(line); - }; - - const conversationsDir = join(workspaceDir, "conversations"); - if (!existsSync(conversationsDir)) return result; - - let entries: string[]; - try { - entries = readdirSync(conversationsDir); - } catch (err) { - pushWarning(`failed to read conversations directory: ${String(err)}`); - return result; - } - - for (const entry of entries) { - const dirPath = join(conversationsDir, entry); - - try { - if (!statSync(dirPath).isDirectory()) continue; - } catch { - continue; - } - - const metaPath = join(dirPath, "meta.json"); - if (!existsSync(metaPath)) { - pushWarning(`${entry}: missing meta.json`); - result.skipped++; - continue; - } - - let meta: DiskMeta; - try { - meta = JSON.parse(readFileSync(metaPath, "utf-8")) as DiskMeta; - } catch (err) { - pushWarning(`${entry}: malformed meta.json: ${String(err)}`); - result.skipped++; - continue; - } - - if (!meta.id) { - pushWarning(`${entry}: meta.json missing id`); - result.skipped++; - continue; - } - - const existing = db - .select() - .from(conversations) - .where(eq(conversations.id, meta.id)) - .get(); - - if (existing) { - result.skipped++; - continue; - } - - const messageRecords: DiskMessageRecord[] = []; - const messagesPath = join(dirPath, "messages.jsonl"); - if (existsSync(messagesPath)) { - try { - const raw = readFileSync(messagesPath, "utf-8"); - for (const line of raw.split("\n")) { - const trimmed = line.trim(); - if (!trimmed) continue; - try { - messageRecords.push(JSON.parse(trimmed) as DiskMessageRecord); - } catch { - pushWarning(`${entry}: malformed JSONL line in messages.jsonl`); - } - } - } catch (err) { - pushWarning(`${entry}: failed to read messages.jsonl: ${String(err)}`); - } - } - - const createdAt = parseEpochMs(meta.createdAt) ?? Date.now(); - const updatedAt = parseEpochMs(meta.updatedAt) ?? createdAt; - - try { - db.transaction((tx) => { - tx.insert(conversations) - .values({ - id: meta.id, - title: meta.title ?? null, - createdAt, - updatedAt, - conversationType: meta.type ?? "standard", - originChannel: meta.channel ?? null, - source: "user", - memoryScopeId: "default", - isAutoTitle: 1, - totalInputTokens: 0, - totalOutputTokens: 0, - totalEstimatedCost: 0, - contextSummary: null, - contextCompactedMessageCount: 0, - contextCompactedAt: null, - originInterface: null, - forkParentConversationId: null, - forkParentMessageId: null, - scheduleJobId: null, - }) - .run(); - - for (const record of messageRecords) { - const contentBlocks = buildContentBlocks(record); - const msgCreatedAt = parseEpochMs(record.ts) ?? createdAt; - - tx.insert(messages) - .values({ - id: randomUUID(), - conversationId: meta.id, - role: record.role, - content: JSON.stringify(contentBlocks), - createdAt: msgCreatedAt, - metadata: null, - }) - .run(); - } - }); - result.recovered++; - } catch (err) { - pushWarning(`${meta.id} (${entry}): insert failed: ${String(err)}`); - result.errors++; - } - } - - return result; -}