diff --git a/tools/bg/backlog-ready-notifier.test.ts b/tools/bg/backlog-ready-notifier.test.ts index fb506a902..4ae3fd7ee 100644 --- a/tools/bg/backlog-ready-notifier.test.ts +++ b/tools/bg/backlog-ready-notifier.test.ts @@ -8,6 +8,7 @@ import { runOnce, type Adapters, type BacklogRow, + type AssignmentHistory, } from "./backlog-ready-notifier"; import type { AgentId, MessageEnvelope, SenderAgentId } from "../bus/types"; @@ -25,6 +26,8 @@ function fakeAdapters( capturedCalls: FakeAssignmentCall[] = [], gitLogStr: string = "", ghPrListStr: string = "", + history: AssignmentHistory | null = null, + writtenHistory: AssignmentHistory[] = [], ): Adapters { return { now: () => new Date(nowIso), @@ -46,6 +49,8 @@ function fakeAdapters( }, execGitLog: () => gitLogStr, execGhPrList: () => ghPrListStr, + readHistoryFile: () => history, + writeHistoryFile: (_, h) => { writtenHistory.push(h); }, }; } @@ -344,7 +349,7 @@ title: only a title }); test("runOnce returns a single result without daemon mode", () => { - const result = runOnce({ ...DEFAULT_CONFIG, backlogDir: "/nonexistent" }); + const result = runOnce({ ...DEFAULT_CONFIG, backlogDir: "/nonexistent" }, fakeAdapters("2026-05-13T18:00:00Z", [])); expect(result.pollAt).toMatch(/^\d{4}-\d{2}-\d{2}T/); // /nonexistent has no P*/ dirs so should report 0 rows expect(result.totalOpenRows).toBe(0); @@ -425,6 +430,162 @@ title: only a title expect(result.publishedEnvelopeIds).toHaveLength(3); expect(captured).toHaveLength(3); }); + test("pollOnce with queue-empty adapters AND ready rows → queueBusy: false, publishes", () => { + const captured: FakeAssignmentCall[] = []; + // clean git log and prs + const adapters = fakeAdapters("2026-05-13T18:00:00Z", [ROW_OPEN_NO_DEPS], captured, "", ""); + const config = { ...DEFAULT_CONFIG, targetAgent: "testagent" }; + + const result = pollOnce(config, adapters); + + expect(result.queueBusy).toBe(false); + expect(result.publishedEnvelopeIds).toHaveLength(1); + expect(captured).toHaveLength(1); + }); + }); + + describe("assignment history dedup / cooldown (slice 5)", () => { + test("History file absent → treated as empty; first assignment proceeds normally", () => { + const captured: FakeAssignmentCall[] = []; + const written: AssignmentHistory[] = []; + const adapters = fakeAdapters("2026-05-13T18:00:00Z", [ROW_OPEN_NO_DEPS], captured, "", "", null, written); + + const result = pollOnce(DEFAULT_CONFIG, adapters); + + expect(result.publishedEnvelopeIds).toHaveLength(1); + expect(result.skippedDueToCooldown).toHaveLength(0); + expect(written).toHaveLength(1); + expect(written[0]!.entries).toHaveLength(1); + expect(written[0]!.entries[0]!.rowId).toBe("B-9001"); + expect(written[0]!.entries[0]!.publishedAt).toBe("2026-05-13T18:00:00.000Z"); + }); + + test("Row assigned at T=0; same row at T=15min (within 30min cooldown) → skipped", () => { + const captured: FakeAssignmentCall[] = []; + const written: AssignmentHistory[] = []; + const history = { entries: [{ rowId: "B-9001", publishedAt: "2026-05-13T18:00:00.000Z" }] }; + const adapters = fakeAdapters("2026-05-13T18:15:00Z", [ROW_OPEN_NO_DEPS], captured, "", "", history, written); + + const result = pollOnce(DEFAULT_CONFIG, adapters); + + expect(result.publishedEnvelopeIds).toHaveLength(0); + expect(result.skippedDueToCooldown).toEqual(["B-9001"]); + expect(captured).toHaveLength(0); + // nothing published, so nothing written + expect(written).toHaveLength(0); + }); + + test("Row assigned at T=0; same row at T=35min (after 30min cooldown) → re-assigned", () => { + const captured: FakeAssignmentCall[] = []; + const written: AssignmentHistory[] = []; + const history = { entries: [{ rowId: "B-9001", publishedAt: "2026-05-13T18:00:00.000Z" }] }; + const adapters = fakeAdapters("2026-05-13T18:35:00Z", [ROW_OPEN_NO_DEPS], captured, "", "", history, written); + + const result = pollOnce(DEFAULT_CONFIG, adapters); + + expect(result.publishedEnvelopeIds).toHaveLength(1); + expect(result.skippedDueToCooldown).toHaveLength(0); + expect(captured).toHaveLength(1); + expect(written).toHaveLength(1); + // Prunes old entry and adds new + expect(written[0]!.entries).toHaveLength(1); + expect(written[0]!.entries[0]!.rowId).toBe("B-9001"); + expect(written[0]!.entries[0]!.publishedAt).toBe("2026-05-13T18:35:00.000Z"); + }); + + test("Multiple rows in cooldown → only expired rows published; skippedDueToCooldown lists skipped IDs", () => { + const captured: FakeAssignmentCall[] = []; + const written: AssignmentHistory[] = []; + const history = { entries: [ + { rowId: "B-9001", publishedAt: "2026-05-13T18:00:00.000Z" }, // inside cooldown + { rowId: "B-9002", publishedAt: "2026-05-13T17:00:00.000Z" }, // expired + ] }; + const adapters = fakeAdapters("2026-05-13T18:15:00Z", [ROW_OPEN_NO_DEPS, ROW_CLOSED, ROW_OPEN_DEPS_SATISFIED], captured, "", "", history, written); + + const result = pollOnce(DEFAULT_CONFIG, adapters); + + expect(result.skippedDueToCooldown).toEqual(["B-9001"]); + expect(result.publishedEnvelopeIds).toHaveLength(1); + expect(captured).toHaveLength(1); + expect(captured[0]!.rowId).toBe("B-9002"); + expect(written[0]!.entries).toHaveLength(2); // keeps the 18:00:00 (active) and adds the new 18:15:00 + expect(written[0]!.entries.map(e => e.rowId)).toEqual(["B-9001", "B-9002"]); + }); + + test("History pruning: entries older than cooldownMin removed on write", () => { + const captured: FakeAssignmentCall[] = []; + const written: AssignmentHistory[] = []; + const history = { entries: [ + { rowId: "B-9999", publishedAt: "2026-05-13T10:00:00.000Z" }, // very old + { rowId: "B-9001", publishedAt: "2026-05-13T18:00:00.000Z" }, // inside cooldown + ] }; + // B-9002 is ready, will trigger a write + const adapters = fakeAdapters("2026-05-13T18:15:00Z", [ROW_CLOSED, ROW_OPEN_DEPS_SATISFIED], captured, "", "", history, written); + + pollOnce(DEFAULT_CONFIG, adapters); + + expect(written[0]!.entries.map(e => e.rowId)).toEqual(["B-9001", "B-9002"]); // B-9999 was pruned + }); + + test("cooldown rows track the actually-published row (not toAssign[i]) when invalid-priority row is skipped (Codex P2)", () => { + const captured: FakeAssignmentCall[] = []; + const written: AssignmentHistory[] = []; + // First row has invalid priority — it will be skipped via `continue` + // in pollOnce. Second row publishes successfully. Without the + // publishedPairs fix, cooldown history would record the SKIPPED + // row's id (toAssign[0]) instead of the published one. + const ROW_BAD_PRIORITY: BacklogRow = { + id: "B-9100", + priority: "XX" as unknown as "P1", + status: "open", + dependsOn: [], + filename: "B-9100-bad-priority.md", + }; + const adapters = fakeAdapters("2026-05-15T22:00:00Z", [ROW_BAD_PRIORITY, ROW_OPEN_NO_DEPS], captured, "", "", null, written); + const result = pollOnce(DEFAULT_CONFIG, adapters); + + expect(result.publishedEnvelopeIds).toHaveLength(1); + expect(captured).toHaveLength(1); + expect(captured[0]!.rowId).toBe("B-9001"); + // Cooldown row must reference the actually-published row B-9001, NOT + // the wrong-indexed B-9100 (the skipped one at toAssign[0]). + expect(written).toHaveLength(1); + expect(written[0]!.entries.map(e => e.rowId)).toEqual(["B-9001"]); + }); + + test("readHistoryFile returns malformed shape ({}) → treated as empty, does not throw (Codex P1)", () => { + const captured: FakeAssignmentCall[] = []; + const written: AssignmentHistory[] = []; + // History adapter returns an object that parses as JSON but lacks + // `.entries`. Without normalizeHistory, pollOnce would call .filter + // on undefined and throw, aborting the notifier loop. + const adapters = fakeAdapters( + "2026-05-15T22:00:00Z", + [ROW_OPEN_NO_DEPS], + captured, "", "", + { } as unknown as AssignmentHistory, + written, + ); + const result = pollOnce(DEFAULT_CONFIG, adapters); + + // No throw → result is well-formed; falls through to first-assignment behavior. + expect(result.publishedEnvelopeIds).toHaveLength(1); + expect(result.skippedDueToCooldown).toHaveLength(0); + }); + + test("readHistoryFile returns malformed entries (non-array) → entries treated as empty (Codex P1)", () => { + const captured: FakeAssignmentCall[] = []; + const written: AssignmentHistory[] = []; + const adapters = fakeAdapters( + "2026-05-15T22:00:00Z", + [ROW_OPEN_NO_DEPS], + captured, "", "", + { entries: "garbage" } as unknown as AssignmentHistory, + written, + ); + const result = pollOnce(DEFAULT_CONFIG, adapters); + expect(result.publishedEnvelopeIds).toHaveLength(1); + }); }); describe("parseArgs", () => { @@ -442,17 +603,23 @@ title: only a title expect(config.backlogDir).toBe("/custom"); }); - test("--no-publish + --agent + --to + --max-assignments", () => { + test("--no-publish + --agent + --to + --max-assignments + --target-agent + --history-file + --cooldown-min", () => { const config = parseArgs([ "--no-publish", "--agent", "vera", "--to", "lior", "--max-assignments", "5", + "--target-agent", "riven", + "--history-file", "/custom/history.json", + "--cooldown-min", "60", ]); expect(config.noPublish).toBe(true); expect(config.fromAgent).toBe("vera"); expect(config.toAgent).toBe("lior"); expect(config.maxAssignments).toBe(5); + expect(config.targetAgent).toBe("riven"); + expect(config.historyFile).toBe("/custom/history.json"); + expect(config.cooldownMin).toBe(60); }); test("rejects unknown flags", () => { @@ -462,5 +629,16 @@ title: only a title test("rejects --backlog-dir without value", () => { expect(() => parseArgs(["--backlog-dir"])).toThrow(/requires a value/); }); + + test("rejects unknown --target-agent (Copilot review: typo silently no-ops queue check)", () => { + expect(() => parseArgs(["--target-agent", "ott"])).toThrow(/--target-agent must be one of/); + }); + + test("--target-agent accepts known identity keys (otto/alexa/lior/vera/riven)", () => { + for (const name of ["otto", "alexa", "lior", "vera", "riven"]) { + const config = parseArgs(["--target-agent", name]); + expect(config.targetAgent).toBe(name); + } + }); }); }); diff --git a/tools/bg/backlog-ready-notifier.ts b/tools/bg/backlog-ready-notifier.ts index d7b904289..35a3ef4bb 100644 --- a/tools/bg/backlog-ready-notifier.ts +++ b/tools/bg/backlog-ready-notifier.ts @@ -12,8 +12,8 @@ // Run: bun tools/bg/backlog-ready-notifier.ts [--once] [--poll-min N] [--backlog-dir PATH] [--no-publish] [--agent NAME] [--to NAME] [--max-assignments N] // Compose with: B-0441 + B-0400 (bus) + B-0440 (reactive peer). -import { readdirSync, readFileSync } from "node:fs"; -import { join } from "node:path"; +import { readdirSync, readFileSync, writeFileSync, renameSync, mkdirSync } from "node:fs"; +import { join, dirname } from "node:path"; import { publish } from "../bus/bus"; import { AGENT_IDS, SENDER_IDS, type AgentId, type MessageEnvelope, type SenderAgentId } from "../bus/types"; @@ -32,6 +32,19 @@ export type NotifierConfig = { toAgent: AgentId; /** Max number of work-assignment envelopes to publish per poll */ maxAssignments: number; + /** + * Agent whose queue state is checked before assignment. CLI input is + * validated against `Object.keys(AGENT_MAP)` (identity-level: otto / alexa / + * lior / vera / riven) so typos error loudly instead of silently no-opping + * the queue check via the "unknown agent = empty" branch of + * isAgentQueueEmpty. Tests may set this field directly to inject custom + * patterns via `adapters.agentPatterns`. + */ + targetAgent: string; + /** File path to track recent assignments */ + historyFile: string; + /** Cooldown window in minutes to avoid re-assigning the same row */ + cooldownMin: number; }; export const DEFAULT_CONFIG: NotifierConfig = { @@ -42,6 +55,18 @@ export const DEFAULT_CONFIG: NotifierConfig = { fromAgent: "otto", toAgent: "*", maxAssignments: 3, + targetAgent: "otto", + historyFile: process.env.ZETA_BUS_DIR ? join(process.env.ZETA_BUS_DIR, "assignment-history.json") : "/tmp/zeta-bus/assignment-history.json", + cooldownMin: 30, +}; + +export type AssignmentHistoryEntry = { + rowId: string; + publishedAt: string; // ISO-8601 +}; + +export type AssignmentHistory = { + entries: AssignmentHistoryEntry[]; }; export type BacklogRow = { @@ -60,6 +85,8 @@ export type PollResult = { publishedEnvelopeIds: string[]; /** Structured publish-failure reason; null on success or skip. */ lastPublishError: string | null; + queueBusy: boolean; + skippedDueToCooldown: string[]; note: string; }; @@ -79,6 +106,8 @@ export type Adapters = { execGitLog: (sinceMinutes: number) => string | null; /** Returns `gh pr list` JSON output, or null if the gh invocation fails (treat as indeterminate). */ execGhPrList: () => string | null; + readHistoryFile: (path: string) => AssignmentHistory | null; + writeHistoryFile: (path: string, history: AssignmentHistory) => void; }; // Keys are lowercase to match the canonical bus agent IDs (SENDER_IDS in tools/bus/types.ts). @@ -192,8 +221,46 @@ const REAL_ADAPTERS: Adapters = { if (result.status !== 0 || result.error) return null; return result.stdout ?? ""; }, + readHistoryFile: (path: string) => { + try { + return JSON.parse(readFileSync(path, "utf8")) as AssignmentHistory; + } catch { + return null; + } + }, + // Atomic file replacement: tmp-then-rename guarantees readers never see + // a torn (half-written) file. It does NOT serialize across concurrent + // writers — two notifier instances writing simultaneously will both + // rename their tmp file over the target, last-writer-wins. The dedup / + // cooldown contract here holds only for single-process notifier; a + // CAS-on-content or file-lock layer is out of scope for slice 5a. + writeHistoryFile: (path: string, history: AssignmentHistory) => { + const tmp = `${path}.tmp.${Date.now()}`; + const dir = dirname(path); + mkdirSync(dir, { recursive: true }); + writeFileSync(tmp, JSON.stringify(history, null, 2)); + renameSync(tmp, path); + }, }; +/** + * Defensive normalizer: accepts whatever readHistoryFile returns and + * coerces it to a well-formed AssignmentHistory. Returning `null`, + * `{}`, or any object missing/malforming `entries` falls back to an + * empty entries array rather than throwing inside pollOnce's filter. + */ +function normalizeHistory(raw: AssignmentHistory | null): AssignmentHistory { + if (raw === null || typeof raw !== "object") return { entries: [] }; + const entries = (raw as { entries?: unknown }).entries; + if (!Array.isArray(entries)) return { entries: [] }; + return { entries: entries.filter( + (e): e is AssignmentHistoryEntry => + e !== null && typeof e === "object" + && typeof (e as { rowId?: unknown }).rowId === "string" + && typeof (e as { publishedAt?: unknown }).publishedAt === "string", + ) }; +} + /** * Returns true if the agent has no commits in the last 30 minutes AND * no currently open PRs. Returns false (conservative) when any adapter @@ -261,10 +328,39 @@ export function pollOnce( r.dependsOn.every(dep => isDepSatisfied(idToStatus.get(dep))), ); - const publishedEnvelopeIds: string[] = []; + const busy = !isAgentQueueEmpty(config.targetAgent, adapters); + if (busy) { + return { + pollAt: pollAt.toISOString(), + totalOpenRows: openRows.length, + readyRowsFound: readyRows.length, + candidateIds: readyRows.slice(0, 10).map(r => r.id), + publishedEnvelopeIds: [], + lastPublishError: null, + queueBusy: true, + skippedDueToCooldown: [], + note: `queue busy for ${config.targetAgent} — skip publish`, + }; + } + + const history = normalizeHistory(adapters.readHistoryFile(config.historyFile)); + const cooldownMs = config.cooldownMin * 60_000; + const activeEntries = new Set( + history.entries + .filter(e => pollAt.getTime() - new Date(e.publishedAt).getTime() < cooldownMs) + .map(e => e.rowId), + ); + + const toPublishRows = readyRows.filter(r => !activeEntries.has(r.id)); + const skippedDueToCooldown = readyRows.filter(r => activeEntries.has(r.id)).map(r => r.id); + + // Track envelope+row as pairs so cooldown entries record the row that was + // actually published (not a wrong-indexed `toAssign[i]` when an earlier + // row was skipped via `continue` for invalid priority). + const publishedPairs: { envelopeId: string; rowId: string }[] = []; let lastPublishError: string | null = null; - if (!config.noPublish && readyRows.length > 0) { - const toAssign = readyRows.slice(0, config.maxAssignments); + if (!config.noPublish && toPublishRows.length > 0) { + const toAssign = toPublishRows.slice(0, config.maxAssignments); for (const row of toAssign) { if (!isValidPriority(row.priority)) continue; const rationale = `Ready-to-grind: ${row.id} is open with all deps satisfied. Decomposition discipline (PR #2999) says decompose ambiguous parents into concrete slices.`; @@ -276,7 +372,7 @@ export function pollOnce( row.priority, rationale, ); - publishedEnvelopeIds.push(envelope.id); + publishedPairs.push({ envelopeId: envelope.id, rowId: row.id }); } catch (e) { // Bus publish failure must NOT kill the poll loop. Captured in // lastPublishError (structured + machine-readable per Riven P1). @@ -284,7 +380,18 @@ export function pollOnce( break; // stop the batch on first failure; next tick retries } } + + if (publishedPairs.length > 0) { + const newEntries: AssignmentHistoryEntry[] = [ + ...history.entries.filter( + e => pollAt.getTime() - new Date(e.publishedAt).getTime() < cooldownMs + ), + ...publishedPairs.map(p => ({ rowId: p.rowId, publishedAt: pollAt.toISOString() })), + ]; + adapters.writeHistoryFile(config.historyFile, { entries: newEntries }); + } } + const publishedEnvelopeIds = publishedPairs.map(p => p.envelopeId); const danglingNote = danglingDeps.size > 0 ? ` (warning: ${danglingDeps.size} dangling dep ref(s) — first: ${[...danglingDeps].slice(0, 3).join(", ")})` @@ -305,6 +412,8 @@ export function pollOnce( candidateIds: readyRows.slice(0, 10).map(r => r.id), publishedEnvelopeIds, lastPublishError, + queueBusy: false, + skippedDueToCooldown, note: readyRows.length > 0 ? `${readyRows.length} of ${openRows.length} open rows are ready-to-grind; top candidates: ${readyRows.slice(0, 5).map(r => r.id).join(", ")}${publishNote}${danglingNote}` : `${openRows.length} open rows but none ready${danglingNote}`, @@ -312,8 +421,8 @@ export function pollOnce( } /** Run a single poll iteration and return its result. */ -export function runOnce(config: NotifierConfig = DEFAULT_CONFIG): PollResult { - const result = pollOnce(config); +export function runOnce(config: NotifierConfig = DEFAULT_CONFIG, adapters?: Adapters): PollResult { + const result = pollOnce(config, adapters); console.log(JSON.stringify(result)); return result; } @@ -355,6 +464,18 @@ function parseAgentId(raw: string | undefined): AgentId { throw new Error(`--to must be one of ${AGENT_IDS.join(", ")}; got "${raw}"`); } +// `targetAgent` indexes into AGENT_MAP (identity-level keys only, since +// queue-state patterns are defined per identity, not per surface). Validate +// the CLI input here so a typo (`--target-agent ott`) errors loudly instead +// of silently bypassing the queue-busy check via the +// "unknown agent = queue empty" branch in isAgentQueueEmpty. +function parseTargetAgent(raw: string | undefined): string { + if (raw === undefined) throw new Error("--target-agent requires a value"); + const known = Object.keys(AGENT_MAP); + if (known.includes(raw)) return raw; + throw new Error(`--target-agent must be one of ${known.join(", ")}; got "${raw}"`); +} + const KNOWN_FLAGS = [ "--once", "--poll-min", @@ -363,6 +484,9 @@ const KNOWN_FLAGS = [ "--agent", "--to", "--max-assignments", + "--target-agent", + "--history-file", + "--cooldown-min", ] as const; export function parseArgs(argv: string[]): NotifierConfig { @@ -376,6 +500,14 @@ export function parseArgs(argv: string[]): NotifierConfig { config.noPublish = true; } else if (arg === "--poll-min") { config.pollIntervalMin = parsePositiveMinutes(argv[++i], "--poll-min"); + } else if (arg === "--target-agent") { + config.targetAgent = parseTargetAgent(argv[++i]); + } else if (arg === "--history-file") { + const next = argv[++i]; + if (next === undefined) throw new Error("--history-file requires a value"); + config.historyFile = next; + } else if (arg === "--cooldown-min") { + config.cooldownMin = parsePositiveMinutes(argv[++i], "--cooldown-min"); } else if (arg === "--backlog-dir") { const next = argv[++i]; if (next === undefined) throw new Error("--backlog-dir requires a value");