diff --git a/packages/adapters/claude/src/classify.ts b/packages/adapters/claude/src/classify.ts index abd0abc6..e4b7dbf7 100644 --- a/packages/adapters/claude/src/classify.ts +++ b/packages/adapters/claude/src/classify.ts @@ -87,9 +87,13 @@ function readBlockedSentinel(path: string): BlockedSentinel | null { const parsed = JSON.parse(readFileSync(path, "utf8")) as Record; if (typeof parsed.question !== "string" || parsed.question.length === 0) return null; const context = typeof parsed.context === "string" ? parsed.context : undefined; - return context === undefined - ? { question: parsed.question } - : { question: parsed.question, context }; + // Only "complexity" is a recognized non-default kind; anything else (or + // absent) is a plain question. + const kind = parsed.kind === "complexity" ? "complexity" : undefined; + const out: BlockedSentinel = { question: parsed.question }; + if (context !== undefined) out.context = context; + if (kind !== undefined) out.kind = kind; + return out; } catch { return null; } diff --git a/packages/adapters/claude/test/adapter.test.ts b/packages/adapters/claude/test/adapter.test.ts index 7b13f91d..9dd8ce74 100644 --- a/packages/adapters/claude/test/adapter.test.ts +++ b/packages/adapters/claude/test/adapter.test.ts @@ -231,6 +231,42 @@ describe("classifyStop", () => { } }); + test("a blocked.json with kind 'complexity' surfaces the complexity pause kind", () => { + const { cwd, middle, transcript } = writeMiddleDir(); + writeFileSync( + join(middle, "blocked.json"), + JSON.stringify({ question: "4 designs, no winner", kind: "complexity" }), + ); + const result = claudeAdapter.classifyStop({ + payload: { cwd }, + transcriptPath: transcript, + sentinelPresent: true, + worktree: cwd, + }); + expect(result.kind).toBe("asked-question"); + if (result.kind === "asked-question") { + expect(result.sentinel).toEqual({ question: "4 designs, no winner", kind: "complexity" }); + } + }); + + test("an unrecognized kind falls back to a plain question (kind omitted)", () => { + const { cwd, middle, transcript } = writeMiddleDir(); + writeFileSync( + join(middle, "blocked.json"), + JSON.stringify({ question: "Q", kind: "whatever" }), + ); + const result = claudeAdapter.classifyStop({ + payload: { cwd }, + transcriptPath: transcript, + sentinelPresent: true, + worktree: cwd, + }); + expect(result.kind).toBe("asked-question"); + if (result.kind === "asked-question") { + expect(result.sentinel).toEqual({ question: "Q" }); + } + }); + test("asked-question tolerates a malformed/contentless blocked.json (sentinel → null)", () => { const { cwd, middle, transcript } = writeMiddleDir(); writeFileSync(join(middle, "blocked.json"), "{ not valid json"); diff --git a/packages/cli/src/commands/config.ts b/packages/cli/src/commands/config.ts new file mode 100644 index 00000000..f28823d2 --- /dev/null +++ b/packages/cli/src/commands/config.ts @@ -0,0 +1,107 @@ +import { existsSync, readFileSync, writeFileSync } from "node:fs"; +import { join } from "node:path"; + +/** Overrides for {@link runConfig} — lets a caller (or a test) point at a config file other than the default. */ +export type ConfigOptions = { + /** Override the per-repo config path (defaults to `/.middle/config.toml`). */ + configFile?: string; +}; + +/** + * The keys `mm config` can set, with where they live and how the value is + * validated/normalized. v1 ships only `auto_dispatch`; the table is the + * extension point for further keys. + */ +const SETTABLE: Record string | null }> = { + auto_dispatch: { + section: "recommender", + normalize: (raw) => (raw === "true" || raw === "false" ? raw : null), + }, +}; + +/** + * Set `key = value` within `[section]`, preserving the rest of the file + * byte-for-byte (comments, ordering, unrelated keys). Replaces the key in place + * if present in that section, inserts it just under the section header if the + * section exists, or appends a fresh section. The match is scoped to the target + * section so an identically-named key in another section is never touched. + */ +function setTomlKey(source: string, section: string, key: string, value: string): string { + const lines = source.split("\n"); + // A TOML table header: `[section]`, tolerating whitespace inside the brackets + // (`[ section ]`) and a trailing line comment (`[section] # note`) — both are + // valid TOML the bare-`[section]` form would miss, appending a duplicate + // table. Returns the trimmed section name, or null if the line isn't a header. + const headerRe = /^\s*\[([^\]]+)\]\s*(?:#.*)?$/; + const headerName = (line: string): string | null => { + const m = headerRe.exec(line); + return m ? m[1]!.trim() : null; + }; + // Escape the key — `SETTABLE` is the extension point, and a future key with + // regex metacharacters must match literally, not as a pattern. + const escapedKey = key.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); + const keyRe = new RegExp(`^(\\s*)${escapedKey}\\s*=`); + let sectionStart = -1; + for (let i = 0; i < lines.length; i += 1) { + if (headerName(lines[i]!) === section) { + sectionStart = i; + break; + } + } + const assignment = `${key} = ${value}`; + if (sectionStart === -1) { + // No such section — append it. Keep exactly one blank line of separation. + const trimmed = source.replace(/\n+$/, ""); + return `${trimmed}\n\n[${section}]\n${assignment}\n`; + } + // Scan the section body (until the next header or EOF) for the key. + for (let i = sectionStart + 1; i < lines.length; i += 1) { + if (headerName(lines[i]!) !== null) break; // next section — key absent in this one + if (keyRe.test(lines[i]!)) { + lines[i] = lines[i]!.replace(keyRe, `$1${key} =`).replace(/=.*/, `= ${value}`); + return lines.join("\n"); + } + } + // Section exists but lacks the key — insert right after the header. + lines.splice(sectionStart + 1, 0, assignment); + return lines.join("\n"); +} + +/** + * `mm config ` — set a per-repo config value in + * `/.middle/config.toml`, preserving the file's comments and layout. v1 + * supports `auto_dispatch ` (the `[recommender]` toggle the + * auto-dispatch loop reads). Returns a process exit code: 0 on success, 1 on error. + */ +export function runConfig( + repoPath: string, + key: string, + value: string, + opts: ConfigOptions = {}, +): number { + const spec = SETTABLE[key]; + if (!spec) { + const known = Object.keys(SETTABLE).join(", "); + console.error(`mm config: unknown key "${key}" (settable keys: ${known})`); + return 1; + } + const normalized = spec.normalize(value); + if (normalized === null) { + console.error(`mm config: invalid value "${value}" for ${key}`); + return 1; + } + const configFile = opts.configFile ?? join(repoPath, ".middle", "config.toml"); + if (!existsSync(configFile)) { + console.error(`mm config: no config at ${configFile} (run \`mm init\` first)`); + return 1; + } + try { + const updated = setTomlKey(readFileSync(configFile, "utf8"), spec.section, key, normalized); + writeFileSync(configFile, updated); + console.log(`mm config: set ${spec.section}.${key} = ${normalized}`); + return 0; + } catch (error) { + console.error(`mm config: ${(error as Error).message}`); + return 1; + } +} diff --git a/packages/cli/src/commands/pause.ts b/packages/cli/src/commands/pause.ts new file mode 100644 index 00000000..0a083def --- /dev/null +++ b/packages/cli/src/commands/pause.ts @@ -0,0 +1,87 @@ +import type { Database } from "bun:sqlite"; +import { existsSync } from "node:fs"; +import { join } from "node:path"; +import { loadConfig } from "@middle/core"; +import { openAndMigrate } from "@middle/dispatcher/src/db.ts"; +import { clearPaused, setPausedUntil } from "@middle/dispatcher/src/repo-config.ts"; +import { deriveRepoSlug } from "../paths.ts"; + +/** + * Overrides for {@link runPause} / {@link runResume} — the config and db paths + * and the repo-slug derivation, so a caller (or a test) can redirect them away + * from the on-disk defaults and the live git remote. + */ +export type PauseResumeOptions = { + /** Override the global config path (defaults to `~/.middle/config.toml`). */ + configPath?: string; + /** Override the database path (defaults to the config's `db_path`). */ + dbPath?: string; + /** Resolve the repo's `owner/name` slug (defaults to the git-remote derivation). */ + resolveSlug?: (repoPath: string) => Promise; +}; + +/** Resolve the db path + the repo slug shared by `mm pause` and `mm resume`. */ +async function resolve( + command: string, + repoPath: string, + opts: PauseResumeOptions, +): Promise<{ dbPath: string; repo: string } | null> { + if (!existsSync(join(repoPath, ".git"))) { + console.error(`mm ${command}: "${repoPath}" is not a git repository`); + return null; + } + let dbPath: string; + try { + dbPath = opts.dbPath ?? loadConfig({ globalPath: opts.configPath }).global.dbPath; + } catch (error) { + console.error(`mm ${command}: failed to load config — ${(error as Error).message}`); + return null; + } + const repo = await (opts.resolveSlug ?? deriveRepoSlug)(repoPath); + return { dbPath, repo }; +} + +/** + * `mm pause ` — suspend auto-dispatch for a repo by setting its + * `repo_config.paused_until`. With no duration the pause is indefinite (cleared + * by `mm resume`). The auto-dispatch loop skips a paused repo. Returns a process + * exit code: 0 on success, 1 on error. + */ +export async function runPause(repoPath: string, opts: PauseResumeOptions = {}): Promise { + let db: Database | null = null; + try { + const resolved = await resolve("pause", repoPath, opts); + if (!resolved) return 1; + db = openAndMigrate(resolved.dbPath); + setPausedUntil(db, resolved.repo); + console.log(`mm pause: ${resolved.repo} auto-dispatch paused (resume with \`mm resume\`)`); + return 0; + } catch (error) { + console.error(`mm pause: ${(error as Error).message}`); + return 1; + } finally { + db?.close(); + } +} + +/** + * `mm resume ` — clear a repo's pause (`repo_config.paused_until`), so the + * auto-dispatch loop considers it again. A no-op if the repo was never paused. + * Returns a process exit code: 0 on success, 1 on error. + */ +export async function runResume(repoPath: string, opts: PauseResumeOptions = {}): Promise { + let db: Database | null = null; + try { + const resolved = await resolve("resume", repoPath, opts); + if (!resolved) return 1; + db = openAndMigrate(resolved.dbPath); + clearPaused(db, resolved.repo); + console.log(`mm resume: ${resolved.repo} auto-dispatch resumed`); + return 0; + } catch (error) { + console.error(`mm resume: ${(error as Error).message}`); + return 1; + } finally { + db?.close(); + } +} diff --git a/packages/cli/src/index.ts b/packages/cli/src/index.ts index 78ba6dd3..e607affd 100755 --- a/packages/cli/src/index.ts +++ b/packages/cli/src/index.ts @@ -8,7 +8,7 @@ * * Public surface: * - the `mm` CLI: `init`, `uninit`, `start`, `stop`, `status`, `doctor`, - * `dispatch`, `run-recommender`, `docs`, `version` + * `dispatch`, `pause`, `resume`, `config`, `run-recommender`, `docs`, `version` * * Where things live: * - `commands/` — one `run*` function per subcommand @@ -23,10 +23,12 @@ * claude-md: false */ import { Command } from "commander"; +import { runConfig } from "./commands/config.ts"; import { runDispatch } from "./commands/dispatch.ts"; import { runDocs } from "./commands/docs.ts"; import { runDoctor } from "./commands/doctor.ts"; import { runInit } from "./commands/init.ts"; +import { runPause, runResume } from "./commands/pause.ts"; import { runRecommender } from "./commands/run-recommender.ts"; import { runStart } from "./commands/start.ts"; import { runStatus } from "./commands/status.ts"; @@ -94,6 +96,26 @@ program .argument("", "path to the local repo checkout") .action(async (repo: string) => process.exit(await runRecommender(repo))); +program + .command("pause") + .description("Pause auto-dispatch for a repo (set repo_config.paused_until)") + .argument("", "path to the local repo checkout") + .action(async (repo: string) => process.exit(await runPause(repo))); + +program + .command("resume") + .description("Resume auto-dispatch for a repo (clear its pause)") + .argument("", "path to the local repo checkout") + .action(async (repo: string) => process.exit(await runResume(repo))); + +program + .command("config") + .description("Set a per-repo config value (e.g. auto_dispatch true)") + .argument("", "path to the local repo checkout") + .argument("", "config key (e.g. auto_dispatch)") + .argument("", "the value to set") + .action((repo: string, key: string, value: string) => process.exit(runConfig(repo, key, value))); + program .command("docs") .description( diff --git a/packages/cli/src/paths.ts b/packages/cli/src/paths.ts index 80c3e4f1..39b9f8d1 100644 --- a/packages/cli/src/paths.ts +++ b/packages/cli/src/paths.ts @@ -1,5 +1,5 @@ import { homedir } from "node:os"; -import { join } from "node:path"; +import { basename, join } from "node:path"; /** middle's per-user home — `~/.middle`. */ export function middleHome(): string { @@ -10,3 +10,23 @@ export function middleHome(): string { export function defaultPidFile(): string { return join(middleHome(), "dispatcher.pid"); } + +/** + * Derive an `owner/name` slug from a repo checkout's `origin` remote, falling + * back to its directory name. This is the same key the dispatcher's workflows + * and `repo_config` rows use (a manual dispatch / recommender run resolves the + * slug the same way), so DB-keyed commands like `mm pause` must derive it + * identically or they'd write a row the auto-dispatch loop never reads. + */ +export async function deriveRepoSlug(repoPath: string): Promise { + const proc = Bun.spawn(["git", "-C", repoPath, "remote", "get-url", "origin"], { + stdout: "pipe", + stderr: "ignore", + }); + const url = (await new Response(proc.stdout).text()).trim(); + if ((await proc.exited) === 0 && url) { + const match = /[:/]([^/]+\/[^/]+?)(?:\.git)?$/.exec(url); + if (match) return match[1]!; + } + return basename(repoPath); +} diff --git a/packages/cli/test/config.test.ts b/packages/cli/test/config.test.ts new file mode 100644 index 00000000..1512bddb --- /dev/null +++ b/packages/cli/test/config.test.ts @@ -0,0 +1,100 @@ +import { afterEach, beforeEach, describe, expect, spyOn, test } from "bun:test"; +import { mkdtempSync, readFileSync, rmSync, writeFileSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { loadConfig } from "@middle/core"; +import { runConfig } from "../src/commands/config.ts"; + +let dir: string; +let configFile: string; + +beforeEach(() => { + dir = mkdtempSync(join(tmpdir(), "middle-cli-config-")); + configFile = join(dir, "config.toml"); +}); + +afterEach(() => { + rmSync(dir, { recursive: true, force: true }); +}); + +function silence(fn: () => T): T { + const log = spyOn(console, "log").mockImplementation(() => {}); + const err = spyOn(console, "error").mockImplementation(() => {}); + try { + return fn(); + } finally { + log.mockRestore(); + err.mockRestore(); + } +} + +describe("mm config auto_dispatch", () => { + test("flips an existing toggle in place, preserving comments and other keys", () => { + writeFileSync( + configFile, + `[recommender] +enabled = true +# opt in per repo +auto_dispatch = false +interval_minutes = 15 +`, + ); + const code = silence(() => runConfig(dir, "auto_dispatch", "true", { configFile })); + expect(code).toBe(0); + const text = readFileSync(configFile, "utf8"); + expect(text).toContain("auto_dispatch = true"); + // Comment and siblings survive untouched. + expect(text).toContain("# opt in per repo"); + expect(text).toContain("interval_minutes = 15"); + expect(text).toContain("enabled = true"); + // And the merged config reflects it. + expect(loadConfig({ repoPath: configFile }).recommender?.autoDispatch).toBe(true); + }); + + test("inserts the key when the [recommender] section lacks it", () => { + writeFileSync(configFile, `[recommender]\nenabled = true\n`); + expect(silence(() => runConfig(dir, "auto_dispatch", "true", { configFile }))).toBe(0); + expect(loadConfig({ repoPath: configFile }).recommender?.autoDispatch).toBe(true); + }); + + test("appends the section when it does not exist", () => { + writeFileSync(configFile, `[repo]\nowner = "o"\nname = "r"\n`); + expect(silence(() => runConfig(dir, "auto_dispatch", "false", { configFile }))).toBe(0); + const cfg = loadConfig({ repoPath: configFile }); + expect(cfg.recommender?.autoDispatch).toBe(false); + expect(cfg.repo?.owner).toBe("o"); // untouched + }); + + test("matches a header with a trailing comment in place (no duplicate section)", () => { + writeFileSync(configFile, `[recommender] # opt-in toggles\nauto_dispatch = false\n`); + expect(silence(() => runConfig(dir, "auto_dispatch", "true", { configFile }))).toBe(0); + const text = readFileSync(configFile, "utf8"); + // Flipped in place — the section header's comment survives and no second + // `[recommender]` was appended. + expect(text).toContain("auto_dispatch = true"); + expect(text).toContain("[recommender] # opt-in toggles"); + expect(text.match(/^\s*\[recommender\]/gm)).toHaveLength(1); + expect(loadConfig({ repoPath: configFile }).recommender?.autoDispatch).toBe(true); + }); + + test("matches a header with whitespace inside the brackets (no duplicate section)", () => { + writeFileSync(configFile, `[ recommender ]\nauto_dispatch = false\n`); + expect(silence(() => runConfig(dir, "auto_dispatch", "true", { configFile }))).toBe(0); + const text = readFileSync(configFile, "utf8"); + expect(text).toContain("auto_dispatch = true"); + expect(text.match(/^\s*\[\s*recommender\s*\]/gm)).toHaveLength(1); + expect(loadConfig({ repoPath: configFile }).recommender?.autoDispatch).toBe(true); + }); + + test("rejects an unknown key and an invalid value", () => { + writeFileSync(configFile, `[recommender]\nauto_dispatch = false\n`); + expect(silence(() => runConfig(dir, "nonsense", "true", { configFile }))).toBe(1); + expect(silence(() => runConfig(dir, "auto_dispatch", "yes", { configFile }))).toBe(1); + // The file was not mutated by a rejected call. + expect(readFileSync(configFile, "utf8")).toContain("auto_dispatch = false"); + }); + + test("errors when the config file is missing", () => { + expect(silence(() => runConfig(dir, "auto_dispatch", "true", { configFile }))).toBe(1); + }); +}); diff --git a/packages/cli/test/pause-resume.test.ts b/packages/cli/test/pause-resume.test.ts new file mode 100644 index 00000000..890c17ae --- /dev/null +++ b/packages/cli/test/pause-resume.test.ts @@ -0,0 +1,89 @@ +import { afterEach, beforeEach, describe, expect, spyOn, test } from "bun:test"; +import { mkdirSync, mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { openAndMigrate } from "@middle/dispatcher/src/db.ts"; +import { getPausedUntil, isPaused } from "@middle/dispatcher/src/repo-config.ts"; +import { runPause, runResume } from "../src/commands/pause.ts"; + +let dir: string; + +beforeEach(() => { + dir = mkdtempSync(join(tmpdir(), "middle-cli-pause-")); + // The commands require a `.git` dir to accept the path as a repo. + mkdirSync(join(dir, ".git"), { recursive: true }); +}); + +afterEach(() => { + rmSync(dir, { recursive: true, force: true }); +}); + +function captureLog(fn: () => Promise): Promise<{ code: number; lines: string[] }> { + const lines: string[] = []; + const log = spyOn(console, "log").mockImplementation((...a: unknown[]) => + lines.push(a.join(" ")), + ); + const err = spyOn(console, "error").mockImplementation((...a: unknown[]) => + lines.push(a.join(" ")), + ); + return fn() + .then((code) => ({ code, lines })) + .finally(() => { + log.mockRestore(); + err.mockRestore(); + }); +} + +describe("mm pause / mm resume", () => { + test("pause sets paused_until; resume clears it (keyed by the resolved slug)", async () => { + const dbPath = join(dir, "db.sqlite3"); + const resolveSlug = async () => "o/r"; + + const paused = await captureLog(() => runPause(dir, { dbPath, resolveSlug })); + expect(paused.code).toBe(0); + + const db = openAndMigrate(dbPath); + try { + expect(isPaused(db, "o/r")).toBe(true); + } finally { + db.close(); + } + + const resumed = await captureLog(() => runResume(dir, { dbPath, resolveSlug })); + expect(resumed.code).toBe(0); + const db2 = openAndMigrate(dbPath); + try { + expect(getPausedUntil(db2, "o/r")).toBeNull(); + expect(isPaused(db2, "o/r")).toBe(false); + } finally { + db2.close(); + } + }); + + test("a slug-resolution failure returns exit 1, not an unhandled rejection", async () => { + const dbPath = join(dir, "db.sqlite3"); + const boom = async () => { + throw new Error("git remote unreadable"); + }; + // `resolve()` runs inside the try, so a throw there surfaces as exit 1 with a + // logged message — for both commands — rather than rejecting the promise. + const paused = await captureLog(() => runPause(dir, { dbPath, resolveSlug: boom })); + expect(paused.code).toBe(1); + expect(paused.lines.join("\n")).toContain("git remote unreadable"); + + const resumed = await captureLog(() => runResume(dir, { dbPath, resolveSlug: boom })); + expect(resumed.code).toBe(1); + expect(resumed.lines.join("\n")).toContain("git remote unreadable"); + }); + + test("a non-git path is rejected with exit 1", async () => { + const notRepo = mkdtempSync(join(tmpdir(), "middle-cli-notrepo-")); + try { + const r = await captureLog(() => runPause(notRepo, { dbPath: join(dir, "db.sqlite3") })); + expect(r.code).toBe(1); + expect(r.lines.join("\n")).toContain("not a git repository"); + } finally { + rmSync(notRepo, { recursive: true, force: true }); + } + }); +}); diff --git a/packages/core/src/adapter.ts b/packages/core/src/adapter.ts index e185339e..705c5b80 100644 --- a/packages/core/src/adapter.ts +++ b/packages/core/src/adapter.ts @@ -114,6 +114,14 @@ export type TranscriptState = { export type BlockedSentinel = { question: string; context?: string; + /** + * Why the agent paused. `"complexity"` marks a **complexity pause** — a + * sub-issue decision needing more candidate forks than `complexity_ceiling` + * (build spec → "Complexity and architectural forks"); the dispatcher surfaces + * it under the state issue's `complexity pause` label. Absent / any other value + * is treated as a plain `"question"`. + */ + kind?: "question" | "complexity"; }; export type StopClassification = diff --git a/packages/dispatcher/src/auto-dispatch.ts b/packages/dispatcher/src/auto-dispatch.ts new file mode 100644 index 00000000..f666678c --- /dev/null +++ b/packages/dispatcher/src/auto-dispatch.ts @@ -0,0 +1,88 @@ +import type { ParsedState } from "@middle/state-issue"; +import { hasFreeSlot, reserveSlot, type SlotState } from "./slots.ts"; + +/** + * The auto-dispatch loop (build spec → "Auto-dispatch loop"). Reads the repo's + * ranked state issue and enqueues every ready Epic that has a free slot, skipping + * rate-limited adapters and exhausted per-adapter slots and stopping when the + * repo or global total is full. It decrements a *local* {@link SlotState} as it + * enqueues so each subsequent row sees fresh headroom without a db round-trip. + * + * There is **no pre-dispatch complexity gate** — complexity is the branching + * factor of a runtime design decision, discovered while the agent works (see + * "Complexity and architectural forks"); a ready Epic dispatches regardless of + * any per-sub-issue complexity, and an overrun pauses *that sub-issue* later (#52). + * + * Triggered (by the daemon) after: a recommender run completes, any workflow + * terminal-state transition, any rate-limit state change, and a manual + * `mm dispatch`. The loop body is the same for every trigger; the deps are + * injected so it runs without the engine or `gh`. + */ +export type AutoDispatchDeps = { + /** The `owner/name` slug whose ready work is dispatched. */ + repo: string; + /** + * Whether auto-dispatch is enabled for this repo right now — the `[recommender] + * auto_dispatch` toggle AND the pause state (#51). A disabled repo is a no-op. + */ + isAutoDispatchEnabled: () => boolean | Promise; + /** Read + parse the repo's state issue (the ranked `readyToDispatch` plan). */ + readState: () => Promise; + /** The set of adapter names currently RATE_LIMITED (reset time still in the future). */ + rateLimitedAdapters: () => Set | Promise>; + /** Snapshot the live slot state once at the start of the pass. */ + getSlotState: () => SlotState; + /** + * Enqueue one implementation workflow. Returns the workflow id, or `null` if + * the enqueue was refused (e.g. the Epic already has an active workflow — the + * collision guard). A refused enqueue must NOT consume a local slot. + */ + enqueue: (input: { repo: string; epicNumber: number; adapter: string }) => Promise; +}; + +/** What an auto-dispatch pass enqueued, and why it stopped. */ +export type AutoDispatchResult = { + /** The Epics enqueued this pass, in dispatch order. */ + enqueued: { epicNumber: number; adapter: string }[]; + /** + * - `disabled` — the repo's auto-dispatch is off (or paused); nothing was read. + * - `slots-exhausted` — the loop stopped because the repo or global total filled. + * - `drained` — every ready row was walked (whatever got enqueued). + */ + reason: "disabled" | "slots-exhausted" | "drained"; +}; + +/** Extract the leading `#` Epic number from a Ready row's `epic` cell, or null. */ +function parseEpicNumber(epic: string): number | null { + const match = /^#(\d+)\b/.exec(epic.trim()); + return match ? Number(match[1]) : null; +} + +/** Run one auto-dispatch pass for a repo. See {@link AutoDispatchDeps}. */ +export async function autoDispatch(deps: AutoDispatchDeps): Promise { + if (!(await deps.isAutoDispatchEnabled())) return { enqueued: [], reason: "disabled" }; + + const state = await deps.readState(); + const rateLimited = await deps.rateLimitedAdapters(); + let slots = deps.getSlotState(); + const enqueued: AutoDispatchResult["enqueued"] = []; + + for (const row of state.readyToDispatch) { + const epicNumber = parseEpicNumber(row.epic); + if (epicNumber === null) continue; // a malformed / empty-state cell — never dispatch it + // Repo or global full → no further row (for any adapter) can dispatch; stop. + if (slots.global.available <= 0 || slots.repo.available <= 0) { + return { enqueued, reason: "slots-exhausted" }; + } + // This row's adapter is blocked, but a later row's adapter may not be. + if (rateLimited.has(row.adapter)) continue; + if (!hasFreeSlot(slots, row.adapter)) continue; // adapter cap exhausted (repo/global checked) + + const workflowId = await deps.enqueue({ repo: deps.repo, epicNumber, adapter: row.adapter }); + if (workflowId === null) continue; // refused (collision) → don't charge a local slot + enqueued.push({ epicNumber, adapter: row.adapter }); + slots = reserveSlot(slots, row.adapter); // local decrement so the next row sees fresh headroom + } + + return { enqueued, reason: "drained" }; +} diff --git a/packages/dispatcher/src/build-deps.ts b/packages/dispatcher/src/build-deps.ts index e92a7a87..471bcbba 100644 --- a/packages/dispatcher/src/build-deps.ts +++ b/packages/dispatcher/src/build-deps.ts @@ -14,7 +14,39 @@ import type { ImplementationDeps, ImplementationInput } from "./workflows/implem import { createWorktree, destroyWorktree } from "./worktree.ts"; /** The slice of {@link GitHubGateway} the deps factory reads. */ -type DepsGitHub = Pick; +type DepsGitHub = Pick< + GitHubGateway, + "findEpicPr" | "getCommentAuthor" | "postComment" | "getIssueLabels" +>; + +/** The label a human applies to an Epic to authorize proceeding past a complexity pause. */ +const APPROVED_LABEL = "approved"; + +/** + * Format the pause comment the dispatcher posts on the Epic when an agent parks. + * A `"complexity"` pause is framed with the literal **complexity pause** label so + * the recommender (reading the Epic) classifies it under the state issue's + * `complexity pause` needs-human label; a plain question reads as an agent + * question. The recommender owns "Needs human input", so this comment is the + * GitHub trace it keys off (the dispatcher never writes that section directly). + */ +export function formatPauseComment(opts: { + question: string; + context?: string; + kind: "question" | "complexity"; +}): string { + const body = opts.context ? `> ${opts.question}\n\n${opts.context}` : `> ${opts.question}`; + if (opts.kind === "complexity") { + return `🧩 **complexity pause** — the agent paused a sub-issue whose decision needs more candidate forks than this repo's \`complexity_ceiling\`. + +${body} + +A human resolves this by **scope reduction or clarification** — or applies the \`${APPROVED_LABEL}\` label to authorize a best-judgment call within the ceiling on resume.`; + } + return `🙋 **agent question** — the dispatched agent needs input to proceed. + +${body}`; +} /** * What the caller binds for the HookServer-dependent part of the deps. The @@ -57,8 +89,15 @@ export type BuildImplementationDepsArgs = { planCommentReader?: PlanCommentReader; /** Resolve the agent's `gh` login once — defaults to the real resolver. */ resolveAgentLogin?: () => Promise; - /** Post the agent's open question on the Epic when it parks (optional). */ + /** + * Surface the agent's pause on the Epic when it parks. Defaults to a + * `gh`-backed comment poster ({@link formatPauseComment}); injectable for tests. + */ postQuestion?: ImplementationDeps["postQuestion"]; + /** Resolve a repo's `complexity_ceiling` for the dispatch brief; defaults to 3 when omitted. */ + resolveComplexityCeiling?: ImplementationDeps["resolveComplexityCeiling"]; + /** Whether an Epic carries the `approved` label (#53); defaults to not-approved when omitted. */ + isEpicApproved?: ImplementationDeps["isEpicApproved"]; launchTimeoutMs?: number; stopTimeoutMs?: number; reviewRoundCap?: number; @@ -120,7 +159,19 @@ export async function buildImplementationDeps( const pr = await github.findEpicPr(repo, epicNumber); return { exists: pr !== null, isDraft: pr?.isDraft ?? false }; }, - postQuestion: args.postQuestion, + // Default surface: comment the pause on the Epic (framed by kind) via `gh`, + // so the recommender can classify a complexity pause under `complexity pause`. + postQuestion: + args.postQuestion ?? + (async ({ repo, epicNumber, question, context, kind }) => { + await github.postComment(repo, epicNumber, formatPauseComment({ question, context, kind })); + }), + resolveComplexityCeiling: args.resolveComplexityCeiling, + // Default: the Epic is approved iff it carries the `approved` label (#53). + isEpicApproved: + args.isEpicApproved ?? + (async (repo, epicNumber) => + (await github.getIssueLabels(repo, epicNumber)).includes(APPROVED_LABEL)), launchTimeoutMs: args.launchTimeoutMs, stopTimeoutMs: args.stopTimeoutMs, reviewRoundCap: args.reviewRoundCap, diff --git a/packages/dispatcher/src/github.ts b/packages/dispatcher/src/github.ts index 07c98d94..82e7614d 100644 --- a/packages/dispatcher/src/github.ts +++ b/packages/dispatcher/src/github.ts @@ -38,6 +38,8 @@ export interface GitHubGateway { editComment(repo: string, commentId: number, body: string): Promise; /** Resolve the author of a comment from its URL; null if unresolvable. */ getCommentAuthor(repo: string, commentUrl: string): Promise; + /** The label names on an issue/Epic (e.g. to check for `approved`). */ + getIssueLabels(repo: string, issueNumber: number): Promise; } async function run( @@ -137,6 +139,28 @@ export const ghGitHub: GitHubGateway = { return prs.find((pr) => closes.test(pr.body)) ?? null; }, + async getIssueLabels(repo, issueNumber) { + const result = await run([ + "gh", + "issue", + "view", + String(issueNumber), + "--repo", + repo, + "--json", + "labels", + "--jq", + ".labels[].name", + ]); + if (result.exitCode !== 0) { + throw new Error(`gh issue view #${issueNumber} labels failed: ${result.stderr.trim()}`); + } + return result.stdout + .split("\n") + .map((l) => l.trim()) + .filter((l) => l !== ""); + }, + async getPullRequest(repo, prNumber) { const result = await run([ "gh", diff --git a/packages/dispatcher/src/hook-server.ts b/packages/dispatcher/src/hook-server.ts index 2f39c0eb..41be6804 100644 --- a/packages/dispatcher/src/hook-server.ts +++ b/packages/dispatcher/src/hook-server.ts @@ -50,8 +50,25 @@ export type ControlPlane = { * worktree). The route maps `null` to 409. */ startDispatch: (input: ControlDispatchInput) => Promise; + /** + * Whether this dispatch has a free slot right now. The route consults it before + * a manual dispatch so `mm dispatch` respects slot limits (a full queue → 429). + * Receives the full {@link ControlDispatchInput} — notably `repoPath` — so the + * slot caps resolve even for a repo the daemon hasn't dispatched yet this + * lifetime (a cold `repoPaths`). Absent → no slot gate. The auto-dispatch loop + * does its own slot accounting and bypasses this route entirely. + */ + slotAvailable?: (input: ControlDispatchInput) => boolean; /** Init-replay events for a fresh `/control/events` subscriber (in-flight rows). */ initEvents?: () => Event[]; + /** + * Fired (best-effort, fire-and-forget) after a successful route dispatch — the + * "manual `mm dispatch`" auto-dispatch trigger (build spec → "Auto-dispatch + * loop"). Only route-initiated dispatches reach it; the auto-dispatch loop + * enqueues via `startDispatch` directly (not the HTTP route), so this never + * re-enters the loop. Absent in gate-only mode and the unit tests. + */ + afterDispatch?: (repo: string) => void; }; /** @@ -359,18 +376,35 @@ export class HookServer implements SessionGate { ); } - const workflowId = await control.startDispatch({ - repo: normalizedRepo, - repoPath, - epicNumber, - adapter, - }); + const dispatchInput = { repo: normalizedRepo, repoPath, epicNumber, adapter }; + + // Manual dispatch respects slot limits — refuse with 429 when the repo/adapter + // has no free slot (build spec → "Auto-dispatch loop": manual force-dispatch + // "still respects slot limits"). Checked before the collision reservation. + if (control.slotAvailable && !control.slotAvailable(dispatchInput)) { + return Response.json( + { error: `no free slot for ${adapter} in ${normalizedRepo}` }, + { status: 429 }, + ); + } + + const workflowId = await control.startDispatch(dispatchInput); if (workflowId === null) { return Response.json( { error: `Epic #${epicNumber} in ${normalizedRepo} already has an active workflow` }, { status: 409 }, ); } + // A manual dispatch is one of the four auto-dispatch triggers: re-run the + // loop so any slots this dispatch didn't take get filled. Best-effort — a + // throw here must not turn a dispatch that already succeeded into a 500. + try { + control.afterDispatch?.(normalizedRepo); + } catch (error) { + console.error( + `[hook-server] afterDispatch failed for ${normalizedRepo}: ${(error as Error).message}`, + ); + } return Response.json({ workflowId }); } diff --git a/packages/dispatcher/src/index.ts b/packages/dispatcher/src/index.ts index b385bd12..313f5986 100644 --- a/packages/dispatcher/src/index.ts +++ b/packages/dispatcher/src/index.ts @@ -10,6 +10,12 @@ * Public surface: * - `buildImplementationDeps` — assemble the implementation workflow's deps + * PR-ready gate (the daemon and any host share this wiring) + * - `autoDispatch` (+ `AutoDispatchDeps`, `AutoDispatchResult`) — the + * slot-and-rate-limit-aware loop that enqueues ready Epics + * - `getSlotState` / `hasFreeSlot` / `reserveSlot` (+ `SlotState`, `SlotLimits`, + * `SlotDimension`) — the concurrency-slot authority the enqueue paths consult + * - `setPausedUntil` / `clearPaused` / `isPaused` / `getPausedUntil` — the + * per-repo pause state (`mm pause`/`mm resume`) the loop's enable-check reads * - `EventHub` (+ `Event`) — the control plane's SSE broadcast hub * - `HookServer` (+ `SessionGate`, `ControlPlane`, `ControlDispatchInput`) — the * hook receiver + `/control` + `/health` surface @@ -20,7 +26,9 @@ * * Where things live: * - `main.ts` — the process entry (`mm start` spawns it); the daemon owns the - * one long-lived engine that hosts every dispatch + review-resume + * one long-lived engine that hosts every dispatch + review-resume, and wires + * the four auto-dispatch triggers + * - `auto-dispatch.ts` — the auto-dispatch loop; `slots.ts` — slot accounting * - `build-deps.ts` — the shared implementation-workflow deps + gate factory * - `event-hub.ts` — the SSE broadcast hub the control plane serves * - `hook-server.ts`, `hook-store.ts` — receive + persist hooks; `/control` + `/health` @@ -37,6 +45,11 @@ * claude-md: true */ export { buildImplementationDeps } from "./build-deps.ts"; +export { autoDispatch } from "./auto-dispatch.ts"; +export type { AutoDispatchDeps, AutoDispatchResult } from "./auto-dispatch.ts"; +export { getSlotState, hasFreeSlot, reserveSlot } from "./slots.ts"; +export type { SlotDimension, SlotLimits, SlotState } from "./slots.ts"; +export { clearPaused, getPausedUntil, isPaused, setPausedUntil } from "./repo-config.ts"; export { EventHub } from "./event-hub.ts"; export type { Event, WorkflowEventData } from "./event-hub.ts"; export { POLLER_INTERVAL_MS, startPoller } from "./poller-cron.ts"; diff --git a/packages/dispatcher/src/main.ts b/packages/dispatcher/src/main.ts index 793fd1d1..2ca5065a 100644 --- a/packages/dispatcher/src/main.ts +++ b/packages/dispatcher/src/main.ts @@ -12,6 +12,7 @@ import { claudeAdapter } from "@middle/adapter-claude"; import type { AgentAdapter } from "@middle/core"; import { loadConfig } from "@middle/core"; import { Engine } from "bunqueue/workflow"; +import { autoDispatch } from "./auto-dispatch.ts"; import { buildImplementationDeps } from "./build-deps.ts"; import { installBunqueueRaceSwallower } from "./bunqueue-race.ts"; import { openAndMigrate } from "./db.ts"; @@ -19,9 +20,13 @@ import { EventHub } from "./event-hub.ts"; import { type ControlPlane, HookServer } from "./hook-server.ts"; import type { RecommenderTrigger } from "./hook-server.ts"; import { DbHookStore } from "./hook-store.ts"; +import { getRateLimitState, setRateLimitObserver } from "./rate-limits.ts"; import { dispatchRecommender, resolveRecommenderOptions } from "./recommender-run.ts"; import { ghPollGateway } from "./poller-gateway.ts"; import { startPoller } from "./poller-cron.ts"; +import { isPaused } from "./repo-config.ts"; +import { getSlotState, hasFreeSlot } from "./slots.ts"; +import { ghStateIssueGateway, readState } from "./state-issue.ts"; import { killSession, status } from "./tmux.ts"; import { startWatchdog } from "./watchdog-cron.ts"; import { @@ -30,8 +35,14 @@ import { listNonTerminalWorkflows, setUpdateWorkflowObserver, } from "./workflow-record.ts"; +import type { ControlDispatchInput } from "./hook-server.ts"; import { createImplementationWorkflow, RESUME_EVENT } from "./workflows/implementation.ts"; +/** Workflow states that free a dispatch slot — a transition into one re-runs auto-dispatch. */ +const SLOT_FREEING_STATES = new Set(["completed", "compensated", "failed", "cancelled"]); +/** Debounce window coalescing a burst of triggers (terminal transitions, etc.) into one pass. */ +const AUTO_DISPATCH_DEBOUNCE_MS = 250; + /** Adapter registry — only `claude` is implemented. */ function getAdapter(name: string): AgentAdapter { if (name !== "claude") throw new Error(`unknown adapter: ${name}`); @@ -68,6 +79,12 @@ async function main(): Promise { // #116) — do NOT add a no-op engine.recover() against the in-memory store. const engine = new Engine({ embedded: true }); + // Declared up front: `scheduleAutoDispatch` (hoisted below) reads it, and a + // slot-freeing broadcast can fire that path the moment the engine observers + // are wired — long before `shutdown` is defined. A `let` initialized later + // would throw a TDZ ReferenceError on that early read. + let shuttingDown = false; + // One place that turns a state change into a `workflow` broadcast (repo/epic // looked up from the row). Fed by two sources below. They overlap on the // states the workflow writes to the row AND bunqueue emits (`completed`, @@ -97,6 +114,9 @@ async function main(): Promise { type: "workflow", data: { id: executionId, repo: row?.repo ?? "", epic: row?.epicNumber ?? null, state }, }); + // Trigger #2: a workflow terminal-state transition freed a slot — re-run + // auto-dispatch for that repo so the next ready Epic takes the slot. + if (row && SLOT_FREEING_STATES.has(state)) scheduleAutoDispatch(row.repo); }; // Source 1: bunqueue-native lifecycle (running/waiting/completed/failed/compensating). @@ -118,10 +138,162 @@ async function main(): Promise { // resolveRepoPath reads it. In-memory — see the durability note on the engine. const repoPaths = new Map(); - // Dashboard "run recommender now" trigger (build spec → Phase 7). Read-only: - // the run rewrites the state issue but `triggerAutoDispatch` stays unwired, so - // nothing auto-dispatches. The run uses an ephemeral port so it never collides - // with the live dispatcher's port. + // ── Auto-dispatch (build spec → "Auto-dispatch loop") ────────────────────── + // The collision-guarded enqueue: the single source of truth for the 409 guard + // (the active-check and reservation run with no intervening await). Both the + // control route AND the auto-dispatch loop enqueue through this — the loop calls + // it directly (not the HTTP route), so its enqueues never re-trigger the loop. + // `source` is recorded on the workflow: `"manual"` for a route dispatch + // (`mm dispatch`), `"auto"` for an auto-dispatch-loop enqueue. + async function startDispatchImpl( + input: ControlDispatchInput, + source: "manual" | "auto", + ): Promise { + const key = epicKey(input.repo, input.epicNumber); + if (inFlightEpics.has(key) || hasNonTerminalEpicWorkflow(db, input.repo, input.epicNumber)) { + return null; + } + inFlightEpics.add(key); + try { + repoPaths.set(input.repo, input.repoPath); + const handle = await engine.start("implementation", { + repo: input.repo, + epicNumber: input.epicNumber, + adapter: input.adapter, + source, + }); + return handle.id; + } catch (error) { + // Start failed → no row will exist to release the reservation via the + // broadcast path, so free the slot here rather than leak it. + inFlightEpics.delete(key); + throw error; + } + } + + /** Resolve a repo's merged slot caps for {@link getSlotState}. */ + function resolveSlotLimits(repoConfig: ReturnType) { + return { + perAdapter: repoConfig.limits?.maxConcurrentPerAdapter ?? {}, + repoMax: repoConfig.limits?.maxConcurrent ?? repoConfig.global.maxConcurrent, + globalMax: repoConfig.global.maxConcurrent, + }; + } + + /** Load a repo's merged config from a checkout path, or null if it can't be read. */ + function loadConfigAt(repoPath: string): ReturnType | null { + try { + return loadConfig({ + globalPath: process.env.MIDDLE_CONFIG, + repoPath: join(repoPath, ".middle", "config.toml"), + }); + } catch { + return null; + } + } + + /** Load a repo's merged config from its registered checkout, or null if unavailable. */ + function loadRepoConfig(repo: string): ReturnType | null { + const repoPath = repoPaths.get(repo); + return repoPath === undefined ? null : loadConfigAt(repoPath); + } + + /** + * Whether a manual dispatch has a free slot right now (manual `mm dispatch` + * respects slot limits — build spec → "Auto-dispatch loop"). Resolves caps from + * the request's own `repoPath`, so the gate holds even on a repo the daemon + * hasn't dispatched yet this lifetime (cold `repoPaths`). Conservative: an + * unreadable config reports a free slot rather than blocking a manual dispatch. + */ + function slotAvailable(input: ControlDispatchInput): boolean { + const repoConfig = loadConfigAt(input.repoPath); + if (!repoConfig) return true; + return hasFreeSlot(getSlotState(db, input.repo, resolveSlotLimits(repoConfig)), input.adapter); + } + + /** The adapter names currently RATE_LIMITED with a reset still in the future. */ + function rateLimitedAdapters(adapters: string[]): Set { + const now = Date.now(); + const limited = new Set(); + for (const adapter of adapters) { + const state = getRateLimitState(db, adapter); + if (state?.status === "RATE_LIMITED" && (state.resetAt === null || state.resetAt > now)) { + limited.add(adapter); + } + } + return limited; + } + + /** Run one auto-dispatch pass for a repo, building deps from its merged config. */ + async function runAutoDispatch(repo: string): Promise { + const repoPath = repoPaths.get(repo); + if (repoPath === undefined) return; // unknown checkout — can't locate the repo + const repoConfig = loadRepoConfig(repo); + if (!repoConfig) return; + const stateIssueNumber = repoConfig.stateIssue?.number; + if (stateIssueNumber === undefined || stateIssueNumber === 0) return; + const limits = resolveSlotLimits(repoConfig); + const adapters = Object.keys(repoConfig.adapters); + const result = await autoDispatch({ + repo, + // Enabled = the per-repo toggle is on AND the repo isn't paused (#51). + isAutoDispatchEnabled: () => + (repoConfig.recommender?.autoDispatch ?? false) && !isPaused(db, repo), + readState: () => readState(ghStateIssueGateway, repo, stateIssueNumber), + rateLimitedAdapters: () => rateLimitedAdapters(adapters), + getSlotState: () => getSlotState(db, repo, limits), + enqueue: ({ repo: r, epicNumber, adapter }) => + startDispatchImpl({ repo: r, repoPath, epicNumber, adapter }, "auto"), + }); + if (result.enqueued.length > 0) { + const list = result.enqueued.map((e) => `#${e.epicNumber}(${e.adapter})`).join(", "); + console.log(`[auto-dispatch] ${repo}: enqueued ${list} — ${result.reason}`); + } + } + + // Debounced, re-entrancy-guarded scheduler so a burst of triggers (many + // terminal transitions, a rate-limit flip) coalesces into one pass per repo, + // and a trigger arriving mid-pass re-runs once after it finishes. + const autoDispatchTimers = new Map>(); + const autoDispatchRunning = new Set(); + const autoDispatchRerun = new Set(); + function scheduleAutoDispatch(repo: string): void { + if (shuttingDown) return; + const existing = autoDispatchTimers.get(repo); + if (existing) clearTimeout(existing); + autoDispatchTimers.set( + repo, + setTimeout(() => { + autoDispatchTimers.delete(repo); + if (autoDispatchRunning.has(repo)) { + autoDispatchRerun.add(repo); + return; + } + autoDispatchRunning.add(repo); + void runAutoDispatch(repo) + .catch((error: unknown) => { + console.error(`[auto-dispatch] ${repo} failed: ${(error as Error).message}`); + }) + .finally(() => { + autoDispatchRunning.delete(repo); + if (autoDispatchRerun.delete(repo)) scheduleAutoDispatch(repo); + }); + }, AUTO_DISPATCH_DEBOUNCE_MS), + ); + } + + // Trigger #3: any rate-limit state change re-runs auto-dispatch for every known + // repo (rate-limit state is cross-repo, keyed by adapter — a reset can unblock + // ready work anywhere). + setRateLimitObserver(() => { + for (const repo of repoPaths.keys()) scheduleAutoDispatch(repo); + }); + + // Dashboard "run recommender now" trigger (build spec → Phase 7). The run + // rewrites the state issue on its own ephemeral engine; Trigger #1 (recommender + // run completes) then schedules an auto-dispatch pass on THIS daemon's engine. + // The repo's checkout is registered up front so that pass can locate it. The + // run uses an ephemeral port so it never collides with the live dispatcher's. const recommenderTrigger: RecommenderTrigger = async ({ repoPath }) => { if (!repoPath) return { status: 400, body: "repoPath required" }; let repoConfig: ReturnType; @@ -135,7 +307,15 @@ async function main(): Promise { } const resolved = await resolveRecommenderOptions(repoPath, repoConfig, getAdapter); if (!resolved.ok) return { status: 400, body: resolved.error }; - void dispatchRecommender({ ...resolved.options, dispatcherPort: 0 }).catch((error: unknown) => { + const repoSlug = resolved.options.repoSlug; + repoPaths.set(repoSlug, repoPath); + void dispatchRecommender({ + ...resolved.options, + dispatcherPort: 0, + // Trigger #1: when the run completes (clean parse + auto_dispatch on), the + // recommender workflow fires this back into the daemon to run the loop. + triggerAutoDispatch: async ({ repo }) => scheduleAutoDispatch(repo), + }).catch((error: unknown) => { console.error(`[main] recommender trigger run failed: ${(error as Error).message}`); }); return { status: 202, body: "recommender run started" }; @@ -155,6 +335,22 @@ async function main(): Promise { return path; }, worktreeRoot: config.global.worktreeRoot, + // The dispatch brief tells the agent its fork budget — the repo's + // `[limits] complexity_ceiling` (default 3), resolved per repo. + resolveComplexityCeiling: (repo) => { + const repoPath = repoPaths.get(repo); + if (repoPath === undefined) return 3; + try { + return ( + loadConfig({ + globalPath: process.env.MIDDLE_CONFIG, + repoPath: join(repoPath, ".middle", "config.toml"), + }).limits?.complexityCeiling ?? 3 + ); + } catch { + return 3; + } + }, // Resume hand-off: a continuation round re-enters the workflow on THIS engine, // so a parked execution and its resume both live where the poller signals. enqueueContinuation: async (input) => { @@ -165,27 +361,14 @@ async function main(): Promise { hub, version, knownAdapter: (name) => name === "claude", - startDispatch: async ({ repo, repoPath, epicNumber, adapter }) => { - const key = epicKey(repo, epicNumber); - // Atomic collision guard: this active-check and the reservation that - // follows run with no await between them, so two concurrent dispatches - // of the same Epic cannot both pass — the second sees the reservation - // (or, once started, the DB row) and gets `null` → 409. - if (inFlightEpics.has(key) || hasNonTerminalEpicWorkflow(db, repo, epicNumber)) { - return null; - } - inFlightEpics.add(key); - try { - repoPaths.set(repo, repoPath); - const handle = await engine.start("implementation", { repo, epicNumber, adapter }); - return handle.id; - } catch (error) { - // Start failed → no row will exist to release the reservation via the - // broadcast path, so free the slot here rather than leak it. - inFlightEpics.delete(key); - throw error; - } - }, + // A route dispatch is a manual `mm dispatch` — recorded `source: 'manual'`. + startDispatch: (input) => startDispatchImpl(input, "manual"), + // Manual dispatch respects slot limits (the loop does its own accounting). + slotAvailable, + // Trigger #4: a manual `mm dispatch` (a route dispatch) re-runs the loop + // so any slot this dispatch didn't claim gets filled. The loop's own + // enqueues bypass the route, so this never re-enters the loop. + afterDispatch: scheduleAutoDispatch, initEvents: () => listNonTerminalWorkflows(db).map((w) => ({ type: "workflow", @@ -223,12 +406,14 @@ async function main(): Promise { console.log(`middle dispatcher up — hooks on :${hookServer.port}, db ${config.global.dbPath}`); - let shuttingDown = false; const shutdown = async (): Promise => { if (shuttingDown) return; shuttingDown = true; // Guard each teardown so a throw/rejection can't skip process.exit. setUpdateWorkflowObserver(null); + setRateLimitObserver(null); + for (const timer of autoDispatchTimers.values()) clearTimeout(timer); + autoDispatchTimers.clear(); try { await stopWatchdog(); } catch (error) { diff --git a/packages/dispatcher/src/rate-limits.ts b/packages/dispatcher/src/rate-limits.ts index 74208fef..c6a1554b 100644 --- a/packages/dispatcher/src/rate-limits.ts +++ b/packages/dispatcher/src/rate-limits.ts @@ -54,6 +54,32 @@ export type SetRateLimitedInput = { now?: number; }; +/** + * An observer notified after a rate-limit state mutation (an adapter flips to + * RATE_LIMITED or back to AVAILABLE). The daemon registers one to re-run + * auto-dispatch — a reset can unblock ready work for any repo (build spec → + * "Auto-dispatch loop": "every rate-limit state change" is a trigger). + * Module-level (process-scoped) and reset to `null` on daemon shutdown. + */ +export type RateLimitObserver = (adapter: string, status: RateLimitStatus) => void; + +let rateLimitObserver: RateLimitObserver | null = null; + +/** Register (or clear, with `null`) the {@link RateLimitObserver}. */ +export function setRateLimitObserver(observer: RateLimitObserver | null): void { + rateLimitObserver = observer; +} + +/** Notify the observer of a state change, never letting it break the write path. */ +function notifyRateLimitObserver(adapter: string, status: RateLimitStatus): void { + if (!rateLimitObserver) return; + try { + rateLimitObserver(adapter, status); + } catch (error) { + console.error(`[rate-limits] observer threw: ${(error as Error).message}`); + } +} + /** Upsert an adapter to `RATE_LIMITED` with its reset time + provenance. */ export function setRateLimited(db: Database, input: SetRateLimitedInput): void { const now = input.now ?? Date.now(); @@ -65,6 +91,7 @@ export function setRateLimited(db: Database, input: SetRateLimitedInput): void { observed_at = excluded.observed_at, source = excluded.source, detail = excluded.detail`, [input.adapter, input.resetAt, now, input.source, input.detail ?? null], ); + notifyRateLimitObserver(input.adapter, "RATE_LIMITED"); } /** Upsert an adapter to `AVAILABLE`, clearing its reset time. */ @@ -77,6 +104,7 @@ export function markAvailable(db: Database, adapter: string, now: number = Date. source = excluded.source, detail = NULL`, [adapter, now, "probe-via-real-work"], ); + notifyRateLimitObserver(adapter, "AVAILABLE"); } /** diff --git a/packages/dispatcher/src/recommender-run.ts b/packages/dispatcher/src/recommender-run.ts index 1a40567e..79cb684c 100644 --- a/packages/dispatcher/src/recommender-run.ts +++ b/packages/dispatcher/src/recommender-run.ts @@ -61,6 +61,13 @@ export type DispatchRecommenderOptions = { runConfig: RecommenderRunConfig; /** Hard cap on the agent run (from `[recommender] agent_timeout_minutes`); undefined → workflow default. */ agentTimeoutMs?: number; + /** + * The auto-dispatch seam (Phase 8). When wired, the recommender workflow fires + * it after a clean run (gated additionally on `runConfig.autoDispatch`) — the + * "recommender run completes" trigger. Left undefined keeps the Phase 7 + * read-only behaviour (nothing auto-dispatches). + */ + triggerAutoDispatch?: (opts: { repo: string; stateIssue: number }) => Promise; /** Test seams; production passes none. */ overrides?: RecommenderRunOverrides; }; @@ -261,7 +268,9 @@ export async function dispatchRecommender( agentTimeoutMs: opts.agentTimeoutMs, gatherContext, surfaceProblem: ov.surfaceProblem ?? ghSurfaceProblem, - // Phase 7 read-only: triggerAutoDispatch intentionally UNWIRED. + // Phase 8: when the caller wires it (the daemon does), the workflow's + // trigger-auto-dispatch step fires it on a clean run with auto_dispatch on. + triggerAutoDispatch: opts.triggerAutoDispatch, }), ); diff --git a/packages/dispatcher/src/repo-config.ts b/packages/dispatcher/src/repo-config.ts new file mode 100644 index 00000000..b7eff8b0 --- /dev/null +++ b/packages/dispatcher/src/repo-config.ts @@ -0,0 +1,64 @@ +import type { Database } from "bun:sqlite"; + +/** + * Per-repo dispatcher state in the `repo_config` table. v1 uses only the + * `paused_until` column — the pause/resume control surface (`mm pause` / + * `mm resume`). A non-null `paused_until` in the future means auto-dispatch is + * suspended for that repo (build spec → "SQLite schema": "if non-null, no + * auto-dispatch"). The other columns (`config_json`, the recommender bookkeeping) + * are reserved for later sync work; the row is created lazily on first pause with + * an empty `config_json` placeholder. + * + * Source of truth: build spec → "SQLite schema" (`repo_config`), "CLI reference" + * (`mm pause`/`mm resume`), and "Auto-dispatch loop". + */ + +/** A pause that never auto-expires — `mm pause` with no duration suspends indefinitely. */ +const INDEFINITE_PAUSE = Number.MAX_SAFE_INTEGER; + +/** + * Set a repo's `paused_until`. Upserts the `repo_config` row, creating it with an + * empty `config_json` placeholder if absent (only `paused_until` + the sync + * timestamp are touched on conflict, so a later config sync isn't clobbered). + * `until` is unix-ms; omit it to pause indefinitely (`mm pause`). + */ +export function setPausedUntil( + db: Database, + repo: string, + until: number = INDEFINITE_PAUSE, + now: number = Date.now(), +): void { + db.run( + `INSERT INTO repo_config (repo, config_json, paused_until, last_synced_at) + VALUES (?, '{}', ?, ?) + ON CONFLICT(repo) DO UPDATE SET paused_until = excluded.paused_until, + last_synced_at = excluded.last_synced_at`, + [repo, until, now], + ); +} + +/** Clear a repo's pause (`mm resume`). A no-op if the repo has no row. */ +export function clearPaused(db: Database, repo: string, now: number = Date.now()): void { + db.run(`UPDATE repo_config SET paused_until = NULL, last_synced_at = ? WHERE repo = ?`, [ + now, + repo, + ]); +} + +/** A repo's `paused_until` (unix-ms), or null if unpaused / no row. */ +export function getPausedUntil(db: Database, repo: string): number | null { + const row = db.query("SELECT paused_until FROM repo_config WHERE repo = ?").get(repo) as { + paused_until: number | null; + } | null; + return row?.paused_until ?? null; +} + +/** + * Whether a repo is paused right now: `paused_until` is set and still in the + * future. A pause whose timestamp has elapsed auto-expires (reads as unpaused), + * so a bounded pause needs no separate cleanup. + */ +export function isPaused(db: Database, repo: string, now: number = Date.now()): boolean { + const until = getPausedUntil(db, repo); + return until !== null && until > now; +} diff --git a/packages/dispatcher/src/slots.ts b/packages/dispatcher/src/slots.ts new file mode 100644 index 00000000..3f8c63c6 --- /dev/null +++ b/packages/dispatcher/src/slots.ts @@ -0,0 +1,111 @@ +import type { Database } from "bun:sqlite"; +import { countActiveImplementationSlots } from "./workflow-record.ts"; + +/** + * Slot accounting — the concurrency authority the dispatcher's enqueue paths + * consult. A "slot" is one live interactive session; a session is held for a + * dispatch's whole non-terminal life (build spec → "Sessions are slot-expensive"), + * so the used count is exactly the non-terminal `implementation` workflow count + * ({@link countActiveImplementationSlots}). The recommender runs on its own + * dedicated slot and is excluded there, so it never counts against these caps. + * + * Three dimensions gate every enqueue, all derived from the same live rows: + * - **per-adapter** (repo-scoped) — `limits.max_concurrent_per_adapter` + * - **per-repo total** — `limits.max_concurrent` + * - **global total** (cross-repo, the shared db) — `global.max_concurrent` + * + * Source of truth: build spec → "Configuration" (`[limits]`), "Auto-dispatch + * loop", and "State issue schema" → "Slot usage". + */ + +/** The configured concurrency caps, merged global + per-repo. */ +export type SlotLimits = { + /** + * Per-adapter cap, keyed by adapter name (`limits.max_concurrent_per_adapter`). + * An adapter absent here has no separate ceiling — it's gated only by the repo + * and global dimensions. + */ + perAdapter: Record; + /** Repo-level total cap (`limits.max_concurrent`). */ + repoMax: number; + /** Global total cap (`global.max_concurrent`) — spans every repo on the shared db. */ + globalMax: number; +}; + +/** One slot dimension: how many are in use, the cap, and the remaining headroom. */ +export type SlotDimension = { + used: number; + max: number; + /** `max - used`, clamped to 0 — a tightened cap never reports negative headroom. */ + available: number; +}; + +/** A repo's slot picture across all three gating dimensions. */ +export type SlotState = { + /** Per-adapter (repo-scoped) dimensions, keyed by adapter name. */ + byAdapter: Record; + /** The repo-total dimension. */ + repo: SlotDimension; + /** The global (cross-repo) dimension. */ + global: SlotDimension; +}; + +/** Build a dimension from a used count and cap, clamping availability to ≥ 0. */ +function dimension(used: number, max: number): SlotDimension { + return { used, max, available: Math.max(0, max - used) }; +} + +/** + * Derive the live slot state for a repo from the `workflows` table + merged + * config. Per-repo `used` (drives `byAdapter` + `repo`) is scoped to this repo; + * `global.used` spans every repo on the shared db — the two are deliberately + * distinct (a repo's per-repo cap must not be charged for another repo's agents). + * The recommender's row is excluded by {@link countActiveImplementationSlots}. + */ +export function getSlotState(db: Database, repo: string, limits: SlotLimits): SlotState { + const repoUsed = countActiveImplementationSlots(db, repo); + const globalUsed = countActiveImplementationSlots(db).total; + const byAdapter: Record = {}; + for (const [adapter, max] of Object.entries(limits.perAdapter)) { + byAdapter[adapter] = dimension(repoUsed.perAdapter[adapter] ?? 0, max); + } + return { + byAdapter, + repo: dimension(repoUsed.total, limits.repoMax), + global: dimension(globalUsed, limits.globalMax), + }; +} + +/** + * Whether an adapter can take a slot right now: the repo and global dimensions + * must both have headroom, and — if the adapter has a per-adapter cap — that + * dimension too. The enqueue guard both the auto-dispatch loop and manual + * dispatch consult; an adapter with no configured per-adapter cap is gated only + * by repo + global. + */ +export function hasFreeSlot(state: SlotState, adapter: string): boolean { + if (state.global.available <= 0 || state.repo.available <= 0) return false; + const adapterDim = state.byAdapter[adapter]; + return adapterDim === undefined || adapterDim.available > 0; +} + +/** Charge one slot against a dimension (used +1, available recomputed). */ +function charge(dim: SlotDimension): SlotDimension { + return dimension(dim.used + 1, dim.max); +} + +/** + * Return a new {@link SlotState} with one slot charged to `adapter` — the repo + * and global dimensions always, plus the adapter's own dimension when it has a + * cap. Pure (the input is left untouched) so the auto-dispatch loop can decrement + * a local view as it enqueues each row, the next row seeing fresh headroom without + * a db round-trip. + */ +export function reserveSlot(state: SlotState, adapter: string): SlotState { + const adapterDim = state.byAdapter[adapter]; + return { + byAdapter: adapterDim ? { ...state.byAdapter, [adapter]: charge(adapterDim) } : state.byAdapter, + repo: charge(state.repo), + global: charge(state.global), + }; +} diff --git a/packages/dispatcher/src/workflow-record.ts b/packages/dispatcher/src/workflow-record.ts index bfa002d0..0314218f 100644 --- a/packages/dispatcher/src/workflow-record.ts +++ b/packages/dispatcher/src/workflow-record.ts @@ -36,19 +36,51 @@ export type CreateWorkflowRecordInput = { repo: string; epicNumber: number | null; adapter: string; + /** + * How the dispatch was initiated — `"manual"` for `mm dispatch`, `"auto"` for + * the auto-dispatch loop (build spec → "Auto-dispatch loop": manual force- + * dispatch is "logged with `source: 'manual'`"). Persisted in `meta_json`. + * Omitted leaves `meta_json` null (e.g. the recommender's own row). + */ + source?: "manual" | "auto"; }; /** Insert a fresh `pending` workflow row. `id` doubles as the bunqueue execution id. */ export function createWorkflowRecord(db: Database, input: CreateWorkflowRecordInput): void { const now = Date.now(); + const metaJson = input.source === undefined ? null : JSON.stringify({ source: input.source }); db.run( `INSERT INTO workflows - (id, kind, repo, epic_number, adapter, state, created_at, updated_at, bunqueue_execution_id) - VALUES (?, ?, ?, ?, ?, 'pending', ?, ?, ?)`, - [input.id, input.kind, input.repo, input.epicNumber, input.adapter, now, now, input.id], + (id, kind, repo, epic_number, adapter, state, created_at, updated_at, bunqueue_execution_id, meta_json) + VALUES (?, ?, ?, ?, ?, 'pending', ?, ?, ?, ?)`, + [ + input.id, + input.kind, + input.repo, + input.epicNumber, + input.adapter, + now, + now, + input.id, + metaJson, + ], ); } +/** Read a workflow's `meta_json.source` (`'manual'`/`'auto'`), or null if unset. */ +export function getWorkflowSource(db: Database, id: string): "manual" | "auto" | null { + const row = db.query("SELECT meta_json FROM workflows WHERE id = ?").get(id) as { + meta_json: string | null; + } | null; + if (!row?.meta_json) return null; + try { + const source = (JSON.parse(row.meta_json) as { source?: unknown }).source; + return source === "manual" || source === "auto" ? source : null; + } catch { + return null; + } +} + export type WorkflowPatch = { state?: WorkflowState; worktreePath?: string; diff --git a/packages/dispatcher/src/workflows/implementation.ts b/packages/dispatcher/src/workflows/implementation.ts index 0032cb39..1e41232a 100644 --- a/packages/dispatcher/src/workflows/implementation.ts +++ b/packages/dispatcher/src/workflows/implementation.ts @@ -38,6 +38,12 @@ export type ImplementationInput = { repo: string; epicNumber: number; adapter: string; + /** + * How the dispatch was initiated: `"manual"` (`mm dispatch`) or `"auto"` (the + * auto-dispatch loop). Recorded on the workflow row's `meta_json`. Defaults to + * `"auto"` when omitted; a continuation carries its origin forward. + */ + source?: "manual" | "auto"; /** * Present only on a continuation execution (a resume). Absent on the initial * dispatch. When set, `prepare-worktree` reuses `resume.worktree` instead of @@ -105,17 +111,35 @@ export type ImplementationDeps = { launchTimeoutMs?: number; stopTimeoutMs?: number; /** - * Post the agent's open question on the Epic for human visibility when it - * parks on `asked-question`. Receives the sentinel contents `classifyStop` - * surfaced (`question` + optional `context`). Optional + injectable so tests - * need no `gh`; the default (wired by the dispatcher) comments on the issue. + * Surface the agent's pause on the Epic for human visibility when it parks on + * `asked-question`. Receives the sentinel contents `classifyStop` surfaced + * (`question` + optional `context`) plus the pause `kind`: a `"complexity"` + * pause is surfaced so the recommender classifies it under the `complexity + * pause` state-issue label, vs. a plain `"question"`. Optional + injectable so + * tests need no `gh`; the default (wired by the dispatcher) comments on the issue. */ postQuestion?: (opts: { repo: string; epicNumber: number; question: string; context?: string; + kind: "question" | "complexity"; }) => Promise; + /** + * The repo's `complexity_ceiling` (`[limits] complexity_ceiling`, default 3) — + * the max fork branching factor the agent resolves itself before pausing the + * sub-issue (build spec → "Complexity and architectural forks"). Injected into + * the dispatch brief so the agent knows its fork budget. Resolved per repo + * (the deps are shared across repos); defaults to 3 when unwired. + */ + resolveComplexityCeiling?: (repo: string) => number | Promise; + /** + * Whether the Epic carries the `approved` label — a human has reviewed its + * scope and authorized the agent to proceed past a complexity overrun with a + * best-judgment call instead of pausing (#53). Reflected in the dispatch brief. + * Optional + injectable; defaults to `false` (not approved) when unwired. + */ + isEpicApproved?: (repo: string, epicNumber: number) => boolean | Promise; /** * Enqueue a continuation execution for the next round (a resume). Injected so * the workflow stays free of the engine: in prod the dispatcher wires this to @@ -159,6 +183,8 @@ const DEFAULT_STOP_TIMEOUT_MS = 4 * 60 * 60 * 1000; const DEFAULT_REVIEW_ROUND_CAP = 5; const DEFAULT_MAX_NUDGES = 3; const DEFAULT_NUDGE_STOP_TIMEOUT_MS = 30 * 60 * 1000; +/** Spec default for `[limits] complexity_ceiling` when no per-repo resolver is wired. */ +const DEFAULT_COMPLEXITY_CEILING = 3; /** * Session names are deterministic so compensations can recompute them, and @@ -179,30 +205,56 @@ function sessionNameFor(input: ImplementationInput): string { * did that). An operator-supplied brief (committed in the repo, or written by a * future `mm dispatch --note` / the recommender) is left untouched. */ -function ensurePromptFile(worktreePath: string, epicNumber: number): void { +function ensurePromptFile( + worktreePath: string, + epicNumber: number, + complexityCeiling: number, + approved: boolean, +): void { const middleDir = join(worktreePath, ".middle"); const promptPath = join(middleDir, "prompt.md"); if (existsSync(promptPath)) return; mkdirSync(middleDir, { recursive: true }); - writeFileSync( - promptPath, - `# middle dispatch brief — Epic #${epicNumber} + writeFileSync(promptPath, defaultDispatchBrief(epicNumber, complexityCeiling, approved)); +} + +/** + * The default dispatch brief written to `.middle/prompt.md`. Carries the repo's + * `complexity_ceiling` so the agent knows its fork budget (the max candidate + * forks it may resolve itself before pausing the sub-issue). When the Epic + * carries the `approved` label, the brief authorizes the agent to proceed past a + * complexity overrun with a best-judgment call instead of pausing (build spec → + * "Complexity and architectural forks"; #53). + */ +function defaultDispatchBrief( + epicNumber: number, + complexityCeiling: number, + approved: boolean, +): string { + const complexityRule = approved + ? `- This Epic carries the \`approved\` label: a human has reviewed its scope and + authorized you to proceed past a complexity overrun. If a sub-issue decision + would need more than ${complexityCeiling} candidate forks (the complexity ceiling), + do NOT pause — make a best-judgment call within the ceiling and keep going.` + : `- Pause only if you are genuinely blocked: ambiguous acceptance criteria, or a + decision needing more than ${complexityCeiling} candidate forks (the complexity + ceiling) to resolve. To pause, write \`.middle/blocked.json\` and exit; for a + complexity overrun include \`"kind": "complexity"\` in it.`; + return `# middle dispatch brief — Epic #${epicNumber} You are running autonomously under middle. There is no human watching in real time. Operating rules for this dispatch: - Work through every phase continuously. The mechanical verification gates are the gates between phases — do not pause for confirmation between them. -- Do not stop to ask questions you can resolve yourself. Pause only if you are - genuinely blocked: ambiguous acceptance criteria, or a decision needing more - candidate forks than the complexity ceiling. +- Do not stop to ask questions you can resolve yourself. +${complexityRule} - The terminal state is: every phase verified, the PR marked ready for review, and the reviewer's brief posted on both the Epic and the PR. Then stop. ## Operator notes for this dispatch (none) -`, - ); +`; } /** @@ -417,6 +469,7 @@ export function createImplementationWorkflow( repo: ctx.input.repo, epicNumber: ctx.input.epicNumber, adapter: ctx.input.adapter, + source: ctx.input.source ?? "auto", }); const resume = ctx.input.resume; if (resume) { @@ -466,8 +519,29 @@ export function createImplementationWorkflow( updateWorkflow(deps.db, ctx.executionId, { state: "launching", sessionName, sessionToken }); try { + // Build the default brief only when one isn't already present (a resume + // wrote its own brief in prepare-worktree; an operator brief is preserved). + // Resolving the per-repo ceiling / `approved` label touches `gh`, so it's + // gated on that and made failure-safe — a flaky label read must fall back to + // safe defaults, never fail the whole dispatch. console.error(`${tag} ensuring .middle/prompt.md exists in worktree`); - ensurePromptFile(handle.path, ctx.input.epicNumber); + if (!existsSync(join(handle.path, ".middle", "prompt.md"))) { + let complexityCeiling = DEFAULT_COMPLEXITY_CEILING; + let approved = false; + try { + if (deps.resolveComplexityCeiling) { + complexityCeiling = await deps.resolveComplexityCeiling(ctx.input.repo); + } + if (deps.isEpicApproved) { + approved = await deps.isEpicApproved(ctx.input.repo, ctx.input.epicNumber); + } + } catch (error) { + console.error( + `${tag} brief-context resolution failed, using defaults (ceiling=${DEFAULT_COMPLEXITY_CEILING}, approved=false): ${(error as Error).message}`, + ); + } + ensurePromptFile(handle.path, ctx.input.epicNumber, complexityCeiling, approved); + } console.error(`${tag} installing hooks in ${handle.path}`); await adapter.installHooks({ @@ -614,12 +688,16 @@ export function createImplementationWorkflow( ); updateWorkflow(deps.db, ctx.executionId, { state: "waiting-human" }); if (outcome.kind === "asked-question" && deps.postQuestion) { + // The sentinel's `kind` distinguishes a complexity pause (surfaced under the + // `complexity pause` state-issue label) from a plain question. + const kind = outcome.sentinel?.kind === "complexity" ? "complexity" : "question"; try { await deps.postQuestion({ repo: ctx.input.repo, epicNumber: ctx.input.epicNumber, question: outcome.sentinel?.question ?? "(question text unavailable)", context: outcome.sentinel?.context, + kind, }); } catch (error) { // Visibility is best-effort — the wait is already armed and durable, so @@ -739,6 +817,7 @@ export function createImplementationWorkflow( repo: ctx.input.repo, epicNumber: ctx.input.epicNumber, adapter: ctx.input.adapter, + source: ctx.input.source, // a continuation keeps the origin of its workstream resume: { reason: payload.reason, round: nextRound, worktree: handle, payload }, }); // The drive that just parked ran a working adapter; revert any stale diff --git a/packages/dispatcher/test/auto-dispatch.test.ts b/packages/dispatcher/test/auto-dispatch.test.ts new file mode 100644 index 00000000..d136db27 --- /dev/null +++ b/packages/dispatcher/test/auto-dispatch.test.ts @@ -0,0 +1,233 @@ +import { describe, expect, test } from "bun:test"; +import type { ParsedState, ReadyRow } from "@middle/state-issue"; +import { autoDispatch, type AutoDispatchDeps } from "../src/auto-dispatch.ts"; +import type { SlotState } from "../src/slots.ts"; + +// The auto-dispatch loop (#50): walk `readyToDispatch`, skip rate-limited +// adapters and exhausted per-adapter slots, stop on a full repo/global, and +// decrement a local slot view as it enqueues. Disabled repos do nothing. The +// deps are injected so the loop is exercised without the engine or `gh`. + +function readyRow(rank: number, epicNumber: number, adapter: string): ReadyRow { + return { + rank, + epic: `#${epicNumber} some title`, + adapter, + subIssues: 2, + reason: "criteria clear", + }; +} + +function stateWith(rows: ReadyRow[]): ParsedState { + return { + version: 1, + generated: "2026-05-24T00:00:00.000Z", + runId: "abcd1234", + intervalMinutes: 15, + readyToDispatch: rows, + needsHumanInput: [], + blocked: [], + inFlight: [], + excluded: [], + rateLimits: { claude: "AVAILABLE", codex: "AVAILABLE", github: "UNKNOWN" }, + slotUsage: { + adapters: [ + { adapter: "claude", used: 0, max: 2 }, + { adapter: "codex", used: 0, max: 1 }, + ], + total: { used: 0, max: 3 }, + global: { used: 0, max: 4 }, + }, + }; +} + +function slots(opts: { + claude?: { used: number; max: number }; + codex?: { used: number; max: number }; + repo: { used: number; max: number }; + global: { used: number; max: number }; +}): SlotState { + const dim = (d: { used: number; max: number }) => ({ + ...d, + available: Math.max(0, d.max - d.used), + }); + const byAdapter: SlotState["byAdapter"] = {}; + if (opts.claude) byAdapter.claude = dim(opts.claude); + if (opts.codex) byAdapter.codex = dim(opts.codex); + return { byAdapter, repo: dim(opts.repo), global: dim(opts.global) }; +} + +type EnqueueCall = { repo: string; epicNumber: number; adapter: string }; + +function makeDeps(overrides: Partial & { _enqueued?: EnqueueCall[] } = {}): { + deps: AutoDispatchDeps; + enqueued: EnqueueCall[]; +} { + const enqueued: EnqueueCall[] = overrides._enqueued ?? []; + const deps: AutoDispatchDeps = { + repo: "o/r", + isAutoDispatchEnabled: () => true, + readState: async () => stateWith([readyRow(1, 101, "claude"), readyRow(2, 102, "codex")]), + rateLimitedAdapters: () => new Set(), + getSlotState: () => + slots({ + claude: { used: 0, max: 2 }, + codex: { used: 0, max: 1 }, + repo: { used: 0, max: 3 }, + global: { used: 0, max: 4 }, + }), + enqueue: async (input) => { + enqueued.push(input); + return `wf-${input.epicNumber}`; + }, + ...overrides, + }; + return { deps, enqueued }; +} + +describe("autoDispatch", () => { + test("normal pass: enqueues every ready row that has a free slot", async () => { + const { deps, enqueued } = makeDeps(); + const result = await autoDispatch(deps); + expect(enqueued).toEqual([ + { repo: "o/r", epicNumber: 101, adapter: "claude" }, + { repo: "o/r", epicNumber: 102, adapter: "codex" }, + ]); + expect(result.enqueued).toEqual([ + { epicNumber: 101, adapter: "claude" }, + { epicNumber: 102, adapter: "codex" }, + ]); + expect(result.reason).toBe("drained"); + }); + + test("does nothing for a repo whose auto-dispatch is disabled", async () => { + const { deps, enqueued } = makeDeps({ isAutoDispatchEnabled: () => false }); + const result = await autoDispatch(deps); + expect(enqueued).toEqual([]); + expect(result.reason).toBe("disabled"); + }); + + test("skips a rate-limited adapter but keeps dispatching others", async () => { + const { deps, enqueued } = makeDeps({ + rateLimitedAdapters: () => new Set(["claude"]), + }); + const result = await autoDispatch(deps); + // #101 (claude) skipped; #102 (codex) still dispatched. + expect(enqueued).toEqual([{ repo: "o/r", epicNumber: 102, adapter: "codex" }]); + expect(result.reason).toBe("drained"); + }); + + test("skips a row whose per-adapter slot is exhausted, continues to the next adapter", async () => { + const { deps, enqueued } = makeDeps({ + getSlotState: () => + slots({ + claude: { used: 2, max: 2 }, // claude full + codex: { used: 0, max: 1 }, + repo: { used: 2, max: 3 }, + global: { used: 2, max: 4 }, + }), + }); + const result = await autoDispatch(deps); + expect(enqueued).toEqual([{ repo: "o/r", epicNumber: 102, adapter: "codex" }]); + expect(result.reason).toBe("drained"); + }); + + test("stops entirely when the repo total is exhausted (slots-exhausted)", async () => { + const { deps, enqueued } = makeDeps({ + getSlotState: () => + slots({ + claude: { used: 0, max: 2 }, + codex: { used: 0, max: 1 }, + repo: { used: 3, max: 3 }, // repo full → break before any enqueue + global: { used: 3, max: 4 }, + }), + }); + const result = await autoDispatch(deps); + expect(enqueued).toEqual([]); + expect(result.reason).toBe("slots-exhausted"); + }); + + test("stops when the global total is exhausted even if the repo has room", async () => { + const { deps, enqueued } = makeDeps({ + getSlotState: () => + slots({ + claude: { used: 0, max: 2 }, + codex: { used: 0, max: 1 }, + repo: { used: 1, max: 3 }, + global: { used: 4, max: 4 }, // global full + }), + }); + const result = await autoDispatch(deps); + expect(enqueued).toEqual([]); + expect(result.reason).toBe("slots-exhausted"); + }); + + test("decrements local counters as it enqueues so a shared cap stops mid-pass", async () => { + // Two claude rows but repo cap leaves room for only one: the first reserves + // the last slot, the second sees repo exhausted and the loop stops. + const { deps, enqueued } = makeDeps({ + readState: async () => stateWith([readyRow(1, 201, "claude"), readyRow(2, 202, "claude")]), + getSlotState: () => + slots({ + claude: { used: 0, max: 5 }, + repo: { used: 2, max: 3 }, // only 1 repo slot left + global: { used: 2, max: 8 }, + }), + }); + const result = await autoDispatch(deps); + expect(enqueued).toEqual([{ repo: "o/r", epicNumber: 201, adapter: "claude" }]); + expect(result.reason).toBe("slots-exhausted"); + }); + + test("a refused enqueue (collision/null) does not consume a local slot", async () => { + // The first row collides (enqueue → null): it must not decrement the local + // view, so the second row still sees the slot it would otherwise have lost. + let first = true; + const { deps, enqueued } = makeDeps({ + readState: async () => stateWith([readyRow(1, 301, "claude"), readyRow(2, 302, "claude")]), + getSlotState: () => + slots({ + claude: { used: 0, max: 5 }, + repo: { used: 2, max: 3 }, // 1 slot; if the collision wrongly consumed it, #302 would be skipped + global: { used: 2, max: 8 }, + }), + enqueue: async (input) => { + if (first) { + first = false; + return null; // collision + } + enqueued.push(input); + return `wf-${input.epicNumber}`; + }, + }); + const result = await autoDispatch(deps); + expect(enqueued).toEqual([{ repo: "o/r", epicNumber: 302, adapter: "claude" }]); + // Both rows were walked (the collision was a no-op, the second dispatched), + // so the loop drained rather than breaking on exhaustion. + expect(result.reason).toBe("drained"); + }); + + test("ignores the empty-state (no ready rows) without enqueuing", async () => { + const { deps, enqueued } = makeDeps({ readState: async () => stateWith([]) }); + const result = await autoDispatch(deps); + expect(enqueued).toEqual([]); + expect(result.reason).toBe("drained"); + }); + + test("no pre-dispatch complexity gate: a large-sub-issue Epic still dispatches (#52)", async () => { + // The loop's only gates are slots + rate limits — never sub-issue count or any + // complexity estimate. A ready Epic dispatches; a complexity overrun is a + // runtime pause on a sub-issue, not a pre-dispatch decision the loop makes. + const big: ReadyRow = { + rank: 1, + epic: "#401 huge epic", + adapter: "claude", + subIssues: 99, + reason: "many phases", + }; + const { deps, enqueued } = makeDeps({ readState: async () => stateWith([big]) }); + const result = await autoDispatch(deps); + expect(enqueued).toEqual([{ repo: "o/r", epicNumber: 401, adapter: "claude" }]); + expect(result.reason).toBe("drained"); + }); +}); diff --git a/packages/dispatcher/test/build-deps.test.ts b/packages/dispatcher/test/build-deps.test.ts index f703f94e..f2f21991 100644 --- a/packages/dispatcher/test/build-deps.test.ts +++ b/packages/dispatcher/test/build-deps.test.ts @@ -3,7 +3,7 @@ import { mkdtempSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import type { AgentAdapter } from "@middle/core"; -import { buildImplementationDeps } from "../src/build-deps.ts"; +import { buildImplementationDeps, formatPauseComment } from "../src/build-deps.ts"; import { openAndMigrate } from "../src/db.ts"; import type { PrReadyGateHandler } from "../src/gates/pr-ready-handler.ts"; import type { PullRequest } from "../src/github.ts"; @@ -62,6 +62,8 @@ describe("buildImplementationDeps", () => { return epicPr; }, getCommentAuthor: async () => null, + postComment: async () => {}, + getIssueLabels: async () => [], }, bindServer: (gate) => { boundGate = gate; @@ -111,6 +113,8 @@ describe("buildImplementationDeps", () => { github: { findEpicPr: async () => null, getCommentAuthor: async () => null, + postComment: async () => {}, + getIssueLabels: async () => [], }, bindServer: () => ({ sessionGate: noopGate, dispatcherUrl: "http://127.0.0.1:1" }), }); @@ -126,4 +130,61 @@ describe("buildImplementationDeps", () => { expect(src).not.toContain("bunqueue"); expect(src).not.toMatch(/new Engine/); }); + + test("the default postQuestion posts a gh comment framed by pause kind", async () => { + const posted: Array<{ repo: string; issue: number; body: string }> = []; + const db = openAndMigrate(dbPath); + try { + const { deps } = await buildImplementationDeps({ + db, + getAdapter: () => fakeAdapter(), + resolveRepoPath: () => "/p", + worktreeRoot: dir, + enqueueContinuation: async () => {}, + resolveAgentLogin: async () => undefined, + github: { + findEpicPr: async () => null, + getCommentAuthor: async () => null, + postComment: async (repo, issue, body) => { + posted.push({ repo, issue, body }); + }, + getIssueLabels: async () => [], + }, + bindServer: () => ({ sessionGate: noopGate, dispatcherUrl: "http://127.0.0.1:1" }), + }); + await deps.postQuestion!({ + repo: "o/r", + epicNumber: 7, + question: "4 designs, no winner", + context: "A/B/C/D", + kind: "complexity", + }); + expect(posted).toHaveLength(1); + expect(posted[0]!.repo).toBe("o/r"); + expect(posted[0]!.issue).toBe(7); + // The complexity-pause framing the recommender keys off for its label. + expect(posted[0]!.body).toContain("complexity pause"); + expect(posted[0]!.body).toContain("4 designs, no winner"); + } finally { + db.close(); + } + }); +}); + +describe("formatPauseComment", () => { + test("a complexity pause carries the `complexity pause` label vocabulary", () => { + const body = formatPauseComment({ question: "Q", context: "C", kind: "complexity" }); + expect(body).toContain("complexity pause"); + expect(body).toContain("complexity_ceiling"); + expect(body).toContain("approved"); + expect(body).toContain("> Q"); + expect(body).toContain("C"); + }); + + test("a plain question reads as an agent question, not a complexity pause", () => { + const body = formatPauseComment({ question: "Q", kind: "question" }); + expect(body).toContain("agent question"); + expect(body).not.toContain("complexity pause"); + expect(body).toContain("> Q"); + }); }); diff --git a/packages/dispatcher/test/control-routes.test.ts b/packages/dispatcher/test/control-routes.test.ts index 0d5862f9..21fe43a2 100644 --- a/packages/dispatcher/test/control-routes.test.ts +++ b/packages/dispatcher/test/control-routes.test.ts @@ -101,6 +101,61 @@ describe("HookServer control routes", () => { expect(startCalls).toEqual([]); }); + test("POST /control/dispatch refuses with 429 when no slot is available (manual respects limits)", async () => { + startWith(makeControl({ slotAvailable: () => false })); + const res = await fetch(`${base}/control/dispatch`, { + method: "POST", + body: JSON.stringify({ + repo: "o/r", + repoPath: "/abs/checkout", + epicNumber: 7, + adapter: "claude", + }), + }); + expect(res.status).toBe(429); + // The slot gate runs before the dispatch, so nothing started. + expect(startCalls).toEqual([]); + }); + + test("POST /control/dispatch proceeds when a slot is available", async () => { + startWith(makeControl({ slotAvailable: () => true })); + const res = await fetch(`${base}/control/dispatch`, { + method: "POST", + body: JSON.stringify({ + repo: "o/r", + repoPath: "/abs/checkout", + epicNumber: 7, + adapter: "claude", + }), + }); + expect(res.status).toBe(200); + expect(startCalls).toHaveLength(1); + }); + + test("POST /control/dispatch survives a throwing afterDispatch (best-effort, still 200)", async () => { + // The post-dispatch trigger is best-effort: a throw must not turn a dispatch + // that already succeeded into a 500. + startWith( + makeControl({ + afterDispatch: () => { + throw new Error("scheduler boom"); + }, + }), + ); + const res = await fetch(`${base}/control/dispatch`, { + method: "POST", + body: JSON.stringify({ + repo: "o/r", + repoPath: "/abs/checkout", + epicNumber: 7, + adapter: "claude", + }), + }); + expect(res.status).toBe(200); + expect(await res.json()).toEqual({ workflowId: "wf-abc" }); + expect(startCalls).toHaveLength(1); + }); + test("POST /control/dispatch rejects a colliding Epic with 409", async () => { startWith(makeControl()); collisionEpics.add(7); diff --git a/packages/dispatcher/test/implementation-workflow.test.ts b/packages/dispatcher/test/implementation-workflow.test.ts index 989154c6..ac3a8357 100644 --- a/packages/dispatcher/test/implementation-workflow.test.ts +++ b/packages/dispatcher/test/implementation-workflow.test.ts @@ -9,7 +9,7 @@ import { Engine } from "bunqueue/workflow"; import { openAndMigrate } from "../src/db.ts"; import type { SessionGate } from "../src/hook-server.ts"; import { getRateLimitState, setRateLimited } from "../src/rate-limits.ts"; -import { getWaitForSignal, getWorkflow } from "../src/workflow-record.ts"; +import { getWaitForSignal, getWorkflow, getWorkflowSource } from "../src/workflow-record.ts"; import { createImplementationWorkflow, RESUME_EVENT, @@ -314,6 +314,156 @@ function readPromptBrief(workflowId: string): string { return readFileSync(join(path, ".middle", "prompt.md"), "utf8"); } +describe("implementation workflow — complexity pause (#52)", () => { + test("a complexity-kind pause routes to waiting-human and surfaces with kind 'complexity'", async () => { + const surfaced: Array<{ kind: string; question: string }> = []; + const deps = makeDeps({ + getAdapter: () => + makeAdapterStub({ + kind: "asked-question", + sentinelPath: "/x/.middle/blocked.json", + sentinel: { + question: "4 viable persistence designs, no clear winner", + context: "A/B/C/D all plausible", + kind: "complexity", + }, + }), + postQuestion: async (opts) => { + surfaced.push({ kind: opts.kind, question: opts.question }); + }, + }); + const id = await start(deps); + await awaitParked(id); // asserts the row reads waiting-human + expect(surfaced).toEqual([ + { kind: "complexity", question: "4 viable persistence designs, no clear winner" }, + ]); + }); + + test("a plain question pause surfaces with kind 'question' (the default)", async () => { + const surfaced: Array<{ kind: string }> = []; + const deps = makeDeps({ + getAdapter: () => + makeAdapterStub({ + kind: "asked-question", + sentinelPath: "/x/.middle/blocked.json", + sentinel: { question: "Which API base URL?" }, // no kind → question + }), + postQuestion: async (opts) => { + surfaced.push({ kind: opts.kind }); + }, + }); + const id = await start(deps); + await awaitParked(id); + expect(surfaced).toEqual([{ kind: "question" }]); + }); + + test("the dispatch brief carries the repo's complexity_ceiling as the agent's fork budget", async () => { + const deps = makeDeps({ + resolveComplexityCeiling: () => 5, + getAdapter: () => + makeAdapterStub({ + kind: "asked-question", + sentinelPath: "/x/.middle/blocked.json", + sentinel: { question: "park to keep the worktree" }, + }), + postQuestion: async () => {}, + }); + const id = await start(deps); + await awaitParked(id); // waiting-human keeps the worktree so the brief is readable + const brief = readPromptBrief(id); + expect(brief).toContain("more than 5 candidate forks"); + // Not approved by default → the brief tells the agent to pause, not push past. + expect(brief).toContain('"kind": "complexity"'); + expect(brief).not.toContain("approved"); + }); + + test("an in-ceiling decision never surfaces a complexity pause", async () => { + // No complexity sentinel: the agent resolved its decisions within the ceiling. + // A `done` parks for *review* (waiting-human), which is NOT a complexity pause — + // the only thing that surfaces a pause is an asked-question stop, and this isn't + // one, so postQuestion is never called. + let surfaced = false; + const deps = makeDeps({ + getAdapter: () => makeAdapterStub({ kind: "done" }), + postQuestion: async () => { + surfaced = true; + }, + }); + const id = await start(deps); + await awaitRow(id, "waiting-human"); // the review park + // Give any stray surface call a chance to land before asserting it didn't. + await Bun.sleep(50); + expect(surfaced).toBe(false); + }); + + test("an approved Epic's brief authorizes proceeding past a complexity overrun (#53)", async () => { + const deps = makeDeps({ + isEpicApproved: () => true, + resolveComplexityCeiling: () => 3, + getAdapter: () => + makeAdapterStub({ + kind: "asked-question", + sentinelPath: "/x/.middle/blocked.json", + sentinel: { question: "park to keep the worktree" }, + }), + postQuestion: async () => {}, + }); + const id = await start(deps); + await awaitParked(id); + const brief = readPromptBrief(id); + // Approved → proceed past, do not pause. + expect(brief).toContain("`approved` label"); + expect(brief).toContain("do NOT pause"); + expect(brief).toContain("best-judgment call"); + }); + + test("a flaky brief-context read falls back to safe defaults, never failing the dispatch", async () => { + const deps = makeDeps({ + resolveComplexityCeiling: () => { + throw new Error("gh rate limited"); + }, + isEpicApproved: () => { + throw new Error("gh rate limited"); + }, + getAdapter: () => + makeAdapterStub({ + kind: "asked-question", + sentinelPath: "/x/.middle/blocked.json", + sentinel: { question: "park" }, + }), + postQuestion: async () => {}, + }); + const id = await start(deps); + await awaitParked(id); // parked, not failed — the throw didn't abort the drive + const brief = readPromptBrief(id); + expect(brief).toContain("more than 3 candidate forks"); // default ceiling + expect(brief).not.toContain("`approved` label"); // default: not approved + }); +}); + +describe("implementation workflow — dispatch source (#53)", () => { + test("records source 'manual' for a manual dispatch and 'auto' by default", async () => { + const manualDeps = makeDeps({ + getAdapter: () => + makeAdapterStub({ + kind: "asked-question", + sentinelPath: "/x/.middle/blocked.json", + sentinel: { question: "park" }, + }), + postQuestion: async () => {}, + }); + engine.register(createImplementationWorkflow(manualDeps)); + const manual = await engine.start("implementation", { ...INPUT, source: "manual" as const }); + await awaitParked(manual.id); + expect(getWorkflowSource(db, manual.id)).toBe("manual"); + + // A fresh dispatch with no source defaults to 'auto'. + const auto = await engine.start("implementation", { ...INPUT, epicNumber: 99 }); + await awaitParked(auto.id); + expect(getWorkflowSource(db, auto.id)).toBe("auto"); + }); +}); + describe("implementation workflow — asked-question park → answer → resume (e2e)", () => { test("parks on asked-question, a human reply resumes a fresh continuation with the answer injected", async () => { const tmux = makeTmuxStub(); diff --git a/packages/dispatcher/test/recommender-run.test.ts b/packages/dispatcher/test/recommender-run.test.ts index 0e2741f3..ab3948b4 100644 --- a/packages/dispatcher/test/recommender-run.test.ts +++ b/packages/dispatcher/test/recommender-run.test.ts @@ -160,9 +160,9 @@ describe("dispatchRecommender — enqueues a recommender workflow (read-only)", } }); - test("is read-only: a clean run never auto-dispatches (triggerAutoDispatch stays unwired)", async () => { - // The override bag has no triggerAutoDispatch seam, and dispatchRecommender - // never wires one — so even with autoDispatch true in config, nothing dispatches. + test("read-only by default: with no triggerAutoDispatch wired, a clean run dispatches nothing", async () => { + // baseOptions carries no triggerAutoDispatch seam, so even with autoDispatch + // true in config the workflow's trigger step is a no-op — the Phase 7 default. const dbPath = join(scratch, "db.sqlite3"); const opts = baseOptions(dbPath, makeOverrides()); opts.runConfig.autoDispatch = true; @@ -177,4 +177,30 @@ describe("dispatchRecommender — enqueues a recommender workflow (read-only)", db.close(); } }); + + test("fires triggerAutoDispatch on a clean run when wired and auto_dispatch is on (trigger #1)", async () => { + const dbPath = join(scratch, "db.sqlite3"); + const calls: { repo: string; stateIssue: number }[] = []; + const opts = baseOptions(dbPath, makeOverrides()); + opts.runConfig.autoDispatch = true; + opts.triggerAutoDispatch = async (o) => { + calls.push(o); + }; + const result = await dispatchRecommender(opts); + expect(result.state).toBe("completed"); + expect(calls).toEqual([{ repo: "thejustinwalsh/middle", stateIssue: 99 }]); + }); + + test("does not fire triggerAutoDispatch when auto_dispatch is off, even if wired", async () => { + const dbPath = join(scratch, "db.sqlite3"); + const calls: unknown[] = []; + const opts = baseOptions(dbPath, makeOverrides()); + opts.runConfig.autoDispatch = false; + opts.triggerAutoDispatch = async (o) => { + calls.push(o); + }; + const result = await dispatchRecommender(opts); + expect(result.state).toBe("completed"); + expect(calls).toEqual([]); + }); }); diff --git a/packages/dispatcher/test/repo-config.test.ts b/packages/dispatcher/test/repo-config.test.ts new file mode 100644 index 00000000..37df6242 --- /dev/null +++ b/packages/dispatcher/test/repo-config.test.ts @@ -0,0 +1,61 @@ +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import type { Database } from "bun:sqlite"; +import { openAndMigrate } from "../src/db.ts"; +import { clearPaused, getPausedUntil, isPaused, setPausedUntil } from "../src/repo-config.ts"; + +// Per-repo pause/resume state (#51): `mm pause` sets repo_config.paused_until, +// `mm resume` clears it, and a paused-until-in-the-future repo reads as paused. + +let db: Database; + +beforeEach(() => { + db = openAndMigrate(":memory:"); +}); + +afterEach(() => { + db.close(); +}); + +describe("repo pause/resume", () => { + test("an unpaused repo (no row) reads as not paused", () => { + expect(getPausedUntil(db, "o/r")).toBeNull(); + expect(isPaused(db, "o/r")).toBe(false); + }); + + test("mm pause (indefinite) suspends the repo", () => { + setPausedUntil(db, "o/r"); + expect(isPaused(db, "o/r")).toBe(true); + expect(getPausedUntil(db, "o/r")).toBe(Number.MAX_SAFE_INTEGER); + }); + + test("a paused_until in the future reads as paused; in the past auto-expires", () => { + const now = 1_000_000; + setPausedUntil(db, "o/r", now + 60_000, now); + expect(isPaused(db, "o/r", now)).toBe(true); + // After the timestamp elapses, the pause auto-expires with no cleanup. + expect(isPaused(db, "o/r", now + 120_000)).toBe(false); + }); + + test("mm resume clears the pause", () => { + setPausedUntil(db, "o/r"); + clearPaused(db, "o/r"); + expect(getPausedUntil(db, "o/r")).toBeNull(); + expect(isPaused(db, "o/r")).toBe(false); + }); + + test("pausing is idempotent and re-pausing updates the timestamp", () => { + setPausedUntil(db, "o/r", 5000, 0); + setPausedUntil(db, "o/r", 9000, 0); + expect(getPausedUntil(db, "o/r")).toBe(9000); + // Only one row exists for the repo. + const count = db.query("SELECT count(*) AS n FROM repo_config WHERE repo = ?").get("o/r") as { + n: number; + }; + expect(count.n).toBe(1); + }); + + test("resume on a never-paused repo is a harmless no-op", () => { + clearPaused(db, "o/r"); + expect(getPausedUntil(db, "o/r")).toBeNull(); + }); +}); diff --git a/packages/dispatcher/test/slots.test.ts b/packages/dispatcher/test/slots.test.ts new file mode 100644 index 00000000..b9dfc236 --- /dev/null +++ b/packages/dispatcher/test/slots.test.ts @@ -0,0 +1,148 @@ +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import type { Database } from "bun:sqlite"; +import { openAndMigrate } from "../src/db.ts"; +import { getSlotState, hasFreeSlot, reserveSlot, type SlotLimits } from "../src/slots.ts"; +import { createWorkflowRecord, updateWorkflow } from "../src/workflow-record.ts"; + +// Slot accounting (#49): the three dimensions (per-adapter, per-repo, global) +// derived from live `workflows` rows + merged config, and the guard the enqueue +// paths consult. The recommender's dedicated row must never count against the +// dispatch slots. + +let db: Database; + +const LIMITS: SlotLimits = { + perAdapter: { claude: 2, codex: 1 }, + repoMax: 3, + globalMax: 4, +}; + +/** Insert a non-terminal (pending → running) workflow row of the given kind. */ +function addWorkflow( + id: string, + kind: "implementation" | "recommender", + repo: string, + adapter: string, +): void { + createWorkflowRecord(db, { + id, + kind, + repo, + epicNumber: kind === "recommender" ? null : 1, + adapter, + }); + updateWorkflow(db, id, { state: "running" }); +} + +beforeEach(() => { + db = openAndMigrate(":memory:"); +}); + +afterEach(() => { + db.close(); +}); + +describe("getSlotState", () => { + test("free-slot: no active work reports full availability across every dimension", () => { + const state = getSlotState(db, "o/r", LIMITS); + expect(state.byAdapter.claude).toEqual({ used: 0, max: 2, available: 2 }); + expect(state.byAdapter.codex).toEqual({ used: 0, max: 1, available: 1 }); + expect(state.repo).toEqual({ used: 0, max: 3, available: 3 }); + expect(state.global).toEqual({ used: 0, max: 4, available: 4 }); + expect(hasFreeSlot(state, "claude")).toBe(true); + expect(hasFreeSlot(state, "codex")).toBe(true); + }); + + test("at-capacity: a full repo reports zero availability and the guard refuses", () => { + addWorkflow("w1", "implementation", "o/r", "claude"); + addWorkflow("w2", "implementation", "o/r", "codex"); + addWorkflow("w3", "implementation", "o/r", "claude"); // repoMax = 3 → repo now full + const state = getSlotState(db, "o/r", LIMITS); + expect(state.repo).toEqual({ used: 3, max: 3, available: 0 }); + // Even though codex's per-adapter cap (1) isn't yet hit by codex alone (1 used), + // the repo dimension is full, so nothing can enqueue. + expect(hasFreeSlot(state, "claude")).toBe(false); + expect(hasFreeSlot(state, "codex")).toBe(false); + }); + + test("per-adapter cap binds before the repo cap", () => { + addWorkflow("w1", "implementation", "o/r", "claude"); + addWorkflow("w2", "implementation", "o/r", "claude"); // claude cap = 2 → adapter full + const state = getSlotState(db, "o/r", LIMITS); + expect(state.byAdapter.claude).toEqual({ used: 2, max: 2, available: 0 }); + expect(state.repo).toEqual({ used: 2, max: 3, available: 1 }); // repo still has room + // claude is capped out; codex still has a repo slot and its own slot free. + expect(hasFreeSlot(state, "claude")).toBe(false); + expect(hasFreeSlot(state, "codex")).toBe(true); + }); + + test("global cap binds across repos even when this repo has room", () => { + // Fill the global cap (4) with work spread across two repos; this repo (o/r) + // holds 2 of them, so its repo dimension (max 3) still shows room — but the + // global dimension is exhausted, so the guard refuses. + addWorkflow("a1", "implementation", "o/r", "claude"); + addWorkflow("a2", "implementation", "o/r", "codex"); + addWorkflow("b1", "implementation", "other/repo", "claude"); + addWorkflow("b2", "implementation", "other/repo", "codex"); + const state = getSlotState(db, "o/r", LIMITS); + expect(state.repo.used).toBe(2); + expect(state.repo.available).toBe(1); + expect(state.global).toEqual({ used: 4, max: 4, available: 0 }); + expect(hasFreeSlot(state, "claude")).toBe(false); + }); + + test("the recommender's own row is never counted against dispatch slots", () => { + addWorkflow("rec", "recommender", "o/r", "claude"); + const state = getSlotState(db, "o/r", LIMITS); + expect(state.repo.used).toBe(0); + expect(state.global.used).toBe(0); + expect(state.byAdapter.claude!.used).toBe(0); + expect(hasFreeSlot(state, "claude")).toBe(true); + }); + + test("used over max clamps available to 0 (a tightened cap never goes negative)", () => { + addWorkflow("w1", "implementation", "o/r", "claude"); + addWorkflow("w2", "implementation", "o/r", "claude"); + addWorkflow("w3", "implementation", "o/r", "claude"); // 3 claude vs cap 2 + const state = getSlotState(db, "o/r", { perAdapter: { claude: 2 }, repoMax: 2, globalMax: 2 }); + expect(state.byAdapter.claude!.available).toBe(0); + expect(state.repo.available).toBe(0); + expect(state.global.available).toBe(0); + }); + + test("an adapter with no per-adapter cap is gated only by the repo and global dims", () => { + // codex has no entry in perAdapter here → no separate adapter ceiling. + const limits: SlotLimits = { perAdapter: { claude: 2 }, repoMax: 3, globalMax: 4 }; + const state = getSlotState(db, "o/r", limits); + expect(state.byAdapter.codex).toBeUndefined(); + expect(hasFreeSlot(state, "codex")).toBe(true); // repo + global have room + }); +}); + +describe("reserveSlot", () => { + test("decrements the adapter, repo, and global dimensions for the loop's local view", () => { + const state = getSlotState(db, "o/r", LIMITS); + const after = reserveSlot(state, "claude"); + expect(after.byAdapter.claude).toEqual({ used: 1, max: 2, available: 1 }); + expect(after.repo).toEqual({ used: 1, max: 3, available: 2 }); + expect(after.global).toEqual({ used: 1, max: 4, available: 3 }); + // The original is left untouched (pure). + expect(state.repo.available).toBe(3); + }); + + test("reserving down to capacity flips the guard to refuse", () => { + let state = getSlotState(db, "o/r", LIMITS); + state = reserveSlot(state, "codex"); // codex cap = 1 → now full + expect(hasFreeSlot(state, "codex")).toBe(false); + expect(hasFreeSlot(state, "claude")).toBe(true); + }); + + test("reserving an adapter with no cap still decrements repo + global", () => { + const limits: SlotLimits = { perAdapter: { claude: 2 }, repoMax: 3, globalMax: 4 }; + let state = getSlotState(db, "o/r", limits); + state = reserveSlot(state, "codex"); + expect(state.repo.used).toBe(1); + expect(state.global.used).toBe(1); + expect(state.byAdapter.codex).toBeUndefined(); + }); +}); diff --git a/packages/dispatcher/test/workflow-record.test.ts b/packages/dispatcher/test/workflow-record.test.ts index 68153005..8fd2afb7 100644 --- a/packages/dispatcher/test/workflow-record.test.ts +++ b/packages/dispatcher/test/workflow-record.test.ts @@ -8,6 +8,7 @@ import { countActiveImplementationSlots, createWorkflowRecord, getWorkflow, + getWorkflowSource, hasNonTerminalEpicWorkflow, listNonTerminalWorkflows, setUpdateWorkflowObserver, @@ -27,6 +28,38 @@ afterEach(() => { rmSync(dir, { recursive: true, force: true }); }); +describe("dispatch source (#53)", () => { + test("records and reads back source 'manual' / 'auto'; null when unset", () => { + createWorkflowRecord(db, { + id: "m", + kind: "implementation", + repo: "o/r", + epicNumber: 1, + adapter: "claude", + source: "manual", + }); + createWorkflowRecord(db, { + id: "a", + kind: "implementation", + repo: "o/r", + epicNumber: 2, + adapter: "claude", + source: "auto", + }); + createWorkflowRecord(db, { + id: "none", + kind: "recommender", + repo: "o/r", + epicNumber: null, + adapter: "claude", + }); + expect(getWorkflowSource(db, "m")).toBe("manual"); + expect(getWorkflowSource(db, "a")).toBe("auto"); + expect(getWorkflowSource(db, "none")).toBeNull(); + expect(getWorkflowSource(db, "missing")).toBeNull(); + }); +}); + describe("createWorkflowRecord", () => { test("inserts a pending implementation row carrying epic_number", () => { createWorkflowRecord(db, { diff --git a/planning/issues/48/decisions.md b/planning/issues/48/decisions.md new file mode 100644 index 00000000..50133c04 --- /dev/null +++ b/planning/issues/48/decisions.md @@ -0,0 +1,51 @@ +# Decisions — Issue #48 (Auto-dispatch + limits) + +## slots.ts is the slot authority; the guard is consumed at the enqueue paths +**File(s):** `packages/dispatcher/src/slots.ts` +**Date:** 2026-05-24 + +**Decision:** Build `slots.ts` as the single slot authority — `getSlotState` derives the three dimensions (per-adapter, per-repo, global) from live `workflows` rows + merged config; `hasFreeSlot` is the enqueue guard; `reserveSlot` is the loop's local decrement. The guard is *consumed* by the auto-dispatch loop (#50) and manual dispatch (#53); #49 unit-tests the authority + guard directly against a live DB. + +**Why:** The existing `countActiveImplementationSlots` already counts non-terminal implementation rows (recommender excluded) per-repo and globally — `slots.ts` builds the limit/availability layer on top rather than re-counting. Keeping the guard a pure function of `(SlotState, adapter)` makes the at-capacity / free-slot / per-adapter-vs-global cases unit-testable without the engine, and lets both the loop and manual dispatch share one consultation. I deliberately did **not** overload the daemon's `startDispatch` `string | null` return (whose `null` means the 409 collision reservation) with a slots-full signal — conflating "already running" with "queue full" would mislead `mm dispatch`. Slot refusal is wired where it's naturally testable: the loop skips, manual dispatch reports slots-full. + +**Evidence:** `countActiveImplementationSlots` (`workflow-record.ts`) + the recommender's `buildRecommenderContext` already model used-slot derivation; the auto-dispatch pseudocode (spec → "Auto-dispatch loop") consumes `slots.globalAvailable` / `slots.byAdapter[adapter]`. + +## Four auto-dispatch triggers behind one debounced scheduler +**File(s):** `packages/dispatcher/src/main.ts`, `hook-server.ts`, `rate-limits.ts` +**Date:** 2026-05-24 + +**Decision:** The four trigger events all funnel into one `scheduleAutoDispatch(repo)` (250ms debounce + per-repo re-entrancy guard) → `runAutoDispatch(repo)`. Wirings: terminal-state transition (the existing `broadcastWorkflow`, gated on slot-freeing states), rate-limit change (a new module-level `setRateLimitObserver` in `rate-limits.ts`, fanned out to every known repo since rate-limit state is cross-repo by adapter), recommender-run completion (threaded `triggerAutoDispatch` through `dispatchRecommender` → the recommender workflow's existing seam), and manual `mm dispatch` (a new optional `ControlPlane.afterDispatch` the route fires). + +**Why:** A debounced scheduler stops a burst of terminal transitions from launching N overlapping passes, and the re-entrancy guard + rerun flag means a trigger arriving mid-pass coalesces into exactly one follow-up. Crucially, the loop's own `enqueue` calls the daemon's `startDispatchImpl` **directly**, not the HTTP route — so the loop never fires `afterDispatch` and can't recursively re-trigger itself. `afterDispatch` only fires for genuine external/route dispatches. I extracted `startDispatchImpl` from the inline `ControlPlane.startDispatch` so the route and the loop share one collision-guarded enqueue. + +**Evidence:** Mirrors the existing `setUpdateWorkflowObserver` pattern (process-scoped observer, reset to null on shutdown). The recommender workflow already had a tested `triggerAutoDispatch` step gated on `config.autoDispatch` + a clean parse (`recommender-workflow.test.ts`). + +## `mm pause` keys by the git-remote slug; `mm config` is a scoped TOML edit +**File(s):** `packages/cli/src/commands/pause.ts`, `config.ts`, `packages/dispatcher/src/repo-config.ts` +**Date:** 2026-05-24 + +**Decision:** `mm pause`/`mm resume` write `repo_config.paused_until` keyed by the **git-remote-derived** `owner/name` slug (shared `deriveRepoSlug` in `paths.ts`), because that's the exact key the auto-dispatch loop reads (`row.repo` at dispatch, the recommender's `repoSlug`). A pause with no duration is indefinite (`Number.MAX_SAFE_INTEGER`); `isPaused` honors the timestamp so a future-dated pause auto-expires. `mm config` is a formatting-preserving, section-scoped TOML line edit restricted to a known-keys table (v1: `auto_dispatch`), rejecting unknown keys. + +**Why:** Keying pause by anything other than the loop's slug would write a row the loop never reads — a silent no-op. Deriving the slug identically everywhere is the invariant. For `mm config`, a smol-toml round-trip would clobber operator comments/layout, so I edit the target line in place within its `[section]` and leave everything else byte-identical. Restricting to a known-keys table keeps a generic ` ` surface from silently writing typo'd or unsupported keys; the table is the extension point as more keys become settable. + +**Evidence:** `dispatch.ts` and `recommender-run.ts` both derive the slug from the `origin` remote — `paths.ts` now hosts the shared helper. The `[recommender] auto_dispatch` default-false toggle already existed in `config.ts` (`mapRecommender`); the loop's `isAutoDispatchEnabled` now composes it with `!isPaused`. + +## A complexity pause is a tagged `asked-question`, surfaced via an Epic comment +**File(s):** `packages/core/src/adapter.ts`, `packages/adapters/claude/src/classify.ts`, `packages/dispatcher/src/workflows/implementation.ts`, `build-deps.ts` +**Date:** 2026-05-24 + +**Decision:** A complexity pause reuses the existing `asked-question` park spine (which already routes to `waiting-human`), distinguished by a `kind: "complexity"` field on the `.middle/blocked.json` `BlockedSentinel`. The dispatch brief instructs the agent to set that field for a complexity overrun (and carries the repo's `complexity_ceiling` as the fork budget). On park, the workflow passes the pause kind to `postQuestion`; the default `gh`-backed poster comments on the Epic with the literal **complexity pause** framing. The recommender (which owns "Needs human input") reads that comment and classifies the Epic under the `complexity pause` label on its next run. + +**Why:** The dispatcher does not own the state issue's "Needs human input" section (owners line: `dispatcher=in-flight,rate-limits,slot-usage`), so it cannot write the label directly — and the recommender would overwrite it anyway. The honest surfacing path is a GitHub artifact the recommender keys off, exactly mirroring how an ambiguity question is surfaced. Reusing the `asked-question` spine (rather than a new park kind) means the waitFor/resume plumbing (Phase 5) needs no change — a complexity pause resumes the same way once the human clarifies or applies `approved`. `complexity_ceiling` is resolved per-repo at dispatch time (the deps are shared across repos), defaulting to 3. The brief is the delivery mechanism for the `kind` contract, so the agent-side skill needs no edit (out of scope for #52). + +**Evidence:** The recommender skill already lists "An agent paused a sub-issue on a `complexity pause`" as a `needs-human` case and the schema doc already defines the `complexity pause` label. There is no pre-dispatch ceiling gate in `auto-dispatch.ts` — the loop's only gates are slots + rate limits (pinned by a test). + +## `approved` is a GitHub label read into the brief; manual dispatch gets a route-level slot gate +**File(s):** `packages/dispatcher/src/build-deps.ts`, `github.ts`, `hook-server.ts`, `main.ts`, `workflow-record.ts` +**Date:** 2026-05-24 + +**Decision:** The `approved` signal is delivered to the agent via the dispatch brief (built in #52's `defaultDispatchBrief`): the default `isEpicApproved` reads the Epic's labels (`gh issue view --json labels`) and, when `approved` is present, the brief authorizes a best-judgment call past a complexity overrun instead of pausing. Manual `mm dispatch` slot-limiting is a **route-level** gate: `ControlPlane.slotAvailable(repo, adapter)` (implemented in `main.ts` via `getSlotState` + `hasFreeSlot`) is consulted by the `/control/dispatch` handler, returning **429** when full. Dispatch origin is recorded as `meta_json.source` (`"manual"` for a route dispatch, `"auto"` for a loop enqueue), threaded through `ImplementationInput` → `createWorkflowRecord` and carried forward by continuations. + +**Why:** Delivering `approved` through the brief (not a separate channel) keeps it alongside `complexity_ceiling` — one place the agent reads its fork policy. The slot gate is route-level rather than inside `startDispatchImpl` because the auto-dispatch loop already does its own local slot accounting and enqueues through `startDispatchImpl` *directly* (bypassing the route); putting the DB-slot check in the shared `startDispatchImpl` would double-gate the loop inconsistently (its rows are created async, so a mid-pass DB read disagrees with the loop's local view). Keeping it on the route means it fires only for genuine manual dispatches. A distinct **429** (vs. the collision **409**) tells `mm dispatch` *why* it was refused — "queue full", not "already running". + +**Evidence:** `getSlotState`/`hasFreeSlot` (#49) are the slot authority; the loop (#50) reserves a local view; the schema's `meta_json` column is the documented adapter-scratch field. The brief's approved/not-approved framing landed in #52's `defaultDispatchBrief`. diff --git a/planning/issues/48/plan.md b/planning/issues/48/plan.md new file mode 100644 index 00000000..fda7fec6 --- /dev/null +++ b/planning/issues/48/plan.md @@ -0,0 +1,41 @@ +# Issue #48: Auto-dispatch and limits (Epic, Phase 8) + +**Link:** https://github.com/thejustinwalsh/middle/issues/48 +**Branch:** middle-issue-48 + +## Goal +Make dispatch autonomous within limits: slot accounting that gates the enqueue path, the auto-dispatch loop that consumes it, a per-repo opt-in toggle + pause/resume, runtime complexity-pause routing to `waiting-human`, and the `approved`-label override. After this Epic, with auto-dispatch enabled on middle's own repo, ready Epics auto-dispatch within their slot limits — and nothing dispatches over the complexity ceiling without `approved`. (Scheduling the recommender on cron is out of scope here, tracked as follow-up `#135`.) + +## Approach +- Build on what already exists: `countActiveImplementationSlots` (per-adapter/total/global, recommender row excluded) is the slot-counting primitive; `rate-limits.ts` is the rate-limit source of truth; the recommender's `triggerAutoDispatch` dep is the already-stubbed seam; the implementation workflow's `asked-question` park already routes to `waiting-human`. Each phase adds the missing enforcement/wiring on top rather than re-architecting. +- The auto-dispatch loop is a pure-ish function with injected deps (read state issue, slot state, rate-limit state, repo gating, enqueue) so it's unit-testable without the engine; the daemon wires the four triggers to it. +- A "complexity pause" is a variety of the existing `asked-question` park, distinguished by a `kind` field the agent writes into `.middle/blocked.json`. The dispatcher reads it, routes to `waiting-human` (unchanged spine), and surfaces it so the recommender labels it `complexity pause`. No pre-dispatch ceiling gate is ever added. +- TDD throughout (the sub-issues all cite the `test-driven-development` skill): test first, then implement. + +## Phases (one per open sub-issue) +1. **#49 — Slot tracking + enforcement in the enqueue path.** `packages/dispatcher/src/slots.ts`: derive used/max per-adapter, per-repo, global from live `workflows` state + merged config; an enqueue-guard that refuses when no slot is free; recommender slot excluded. Tests: at-capacity, free-slot, per-adapter-vs-global. +2. **#50 — Auto-dispatch loop (four trigger events).** `packages/dispatcher/src/auto-dispatch.ts`: the spec's loop — walk `readyToDispatch`, skip rate-limited adapters + exhausted slots, decrement local counters, no-op for a disabled repo. Wire the four triggers (recommender-run completes, workflow terminal transition, rate-limit change, manual `mm dispatch`) in `main.ts` + the recommender seam. Tests: normal pass, rate-limited-adapter skip, slots-exhausted stop. +3. **#51 — Per-repo `auto_dispatch` toggle + pause/resume.** `repo_config` helpers for `paused_until`; the loop checks both the `[recommender] auto_dispatch` toggle (default false) and pause state. `mm pause ` / `mm resume ` / `mm config `. Tests: toggle off, toggle on, paused-until-in-future. +4. **#52 — Route sub-issue complexity overruns to waiting-human.** `BlockedSentinel` gains an optional pause `kind`; a complexity-kind park routes to `waiting-human` and is surfaced for the recommender's `complexity pause` label; `complexity_ceiling` (merged config, default 3) injected into the agent brief. Assert no pre-dispatch ceiling check in the loop. Tests: overrun routes to waiting-human; in-ceiling does not pause. +5. **#53 — `approved`-label handling for complexity pauses.** Read the Epic's `approved` label; when present, the brief tells the agent it may proceed past an overrun with a best-judgment call; without it, the overrun pauses (#52). Manual `mm dispatch` stays slot-limited and is logged `source: 'manual'`. Tests: approved proceeds, non-approved pauses, manual is slot-limited. + +## Files likely to change +- `packages/dispatcher/src/slots.ts` (new) — slot derivation + enqueue guard +- `packages/dispatcher/src/auto-dispatch.ts` (new) — the loop +- `packages/dispatcher/src/repo-config.ts` (new) — `repo_config` row helpers (paused_until, toggle snapshot) +- `packages/dispatcher/src/main.ts` — wire the four auto-dispatch triggers; pass complexity_ceiling/approved into deps +- `packages/dispatcher/src/workflows/implementation.ts` — complexity-pause surfacing + brief injection of ceiling/approved +- `packages/dispatcher/src/recommender-run.ts` / `workflows/recommender.ts` — wire `triggerAutoDispatch` +- `packages/core/src/adapter.ts` (+ `packages/adapters/claude/src/classify.ts`) — `BlockedSentinel` pause `kind` +- `packages/cli/src/commands/pause.ts`, `resume.ts`, `config.ts` (new) + `packages/cli/src/index.ts` — CLI surface +- `packages/dispatcher/src/index.ts` — export new public surface; per-folder docs as needed +- tests alongside each + +## Out of scope +- Slot pills + auto-dispatch toggle UI in the dashboard (Phase 9) +- The agent-side fork mechanic / pause decision (the `implementing-github-issues` skill) — #52 defines only the sentinel contract the dispatcher reads +- Applying the `approved` label (a human action) +- The waitFor/resume plumbing (Phase 5 — already built) + +## Open questions +- None blocking. The `complexity pause` surfacing mechanism (dispatcher posts a recognizable comment that the recommender classifies, vs. a label) will be resolved during #52 against the existing `postQuestion` pattern and the recommender skill's `needs-human` classification — both already reference `complexity pause`.