diff --git a/docs/BACKLOG.md b/docs/BACKLOG.md index 68781fa21..8466d92fb 100644 --- a/docs/BACKLOG.md +++ b/docs/BACKLOG.md @@ -305,7 +305,7 @@ are closed (status: closed in frontmatter)._ - [ ] **[B-0496](backlog/P1/B-0496-hamiltonian-viz-slice-2-live-github-api-2026-05-14.md)** Hamiltonian viz — slice-2: live GitHub API commit fetch → trajectory - [ ] **[B-0497](backlog/P1/B-0497-b0440-slice-6-standing-by-detector-launchd-registration-2026-05-14.md)** B-0440 slice 6 — standing-by-detector launchd plist + AUTONOMOUS-LOOP.md wiring update - [x] **[B-0500](backlog/P1/B-0500-b0441-slice-3-queue-state-guard-poll-once-wiring-2026-05-14.md)** B-0441 slice 3 — wire isAgentQueueEmpty guard into pollOnce -- [ ] **[B-0501](backlog/P1/B-0501-b0441-slice-5-assignment-history-dedup-cooldown-2026-05-14.md)** B-0441 slice 5 — assignment history dedup cooldown (avoid re-assigning same row within short window) +- [x] **[B-0501](backlog/P1/B-0501-b0441-slice-5-assignment-history-dedup-cooldown-2026-05-14.md)** B-0441 slice 5 — assignment history dedup cooldown (avoid re-assigning same row within short window) - [ ] **[B-0502](backlog/P1/B-0502-b0441-slice-6-launchd-plist-autonomous-loop-docs-2026-05-14.md)** B-0441 slice 6 — launchd plist for backlog-ready-notifier + AUTONOMOUS-LOOP.md update - [x] **[B-0503](backlog/P1/B-0503-b0442-slice5a-open-recovery-pr-core-function-2026-05-14.md)** B-0442 slice 5a — openRecoveryPR core function + RecoveryAdapters + DST tests - [x] **[B-0504](backlog/P1/B-0504-b0442-slice5b-wire-auto-recover-into-pollonce-2026-05-14.md)** B-0442 slice 5b — wire --auto-recover into pollOnce + real RecoveryAdapters + config flags diff --git a/docs/backlog/P1/B-0441-backlog-row-ready-to-grind-notifier-background-service-2026-05-13.md b/docs/backlog/P1/B-0441-backlog-row-ready-to-grind-notifier-background-service-2026-05-13.md index b4967db07..83324af2c 100644 --- a/docs/backlog/P1/B-0441-backlog-row-ready-to-grind-notifier-background-service-2026-05-13.md +++ b/docs/backlog/P1/B-0441-backlog-row-ready-to-grind-notifier-background-service-2026-05-13.md @@ -50,8 +50,8 @@ provides a less-ambiguous concrete claim — eliminating the decompositionSuggestion: } }` (Slice 4, shipped) - [x] Honors agent autonomy — assignment is suggestion, not directive (per `.claude/rules/no-directives.md`) — by design; envelope is advisory -- [ ] Tracks assignment history to avoid re-assigning same row - within short window (Slice 5a, B-0501 open — `historyFile`/cooldown logic not yet in `tools/bg/backlog-ready-notifier.ts`) +- [x] Tracks assignment history to avoid re-assigning same row + within short window (Slice 5a, B-0501 shipped — `historyFile`/`cooldownMin` config + read/write adapters + cooldown-gate logic in `tools/bg/backlog-ready-notifier.ts`; 8 new tests cover skip-within-window / re-assign-after-window / first-assignment / multi-row-mix / pruning) - [x] Tests cover the readiness-detection heuristics (`tools/bg/backlog-ready-notifier.test.ts`) - [x] Documented in `docs/AUTONOMOUS-LOOP.md` diff --git a/docs/backlog/P1/B-0501-b0441-slice-5-assignment-history-dedup-cooldown-2026-05-14.md b/docs/backlog/P1/B-0501-b0441-slice-5-assignment-history-dedup-cooldown-2026-05-14.md index 12ed51832..429462de0 100644 --- a/docs/backlog/P1/B-0501-b0441-slice-5-assignment-history-dedup-cooldown-2026-05-14.md +++ b/docs/backlog/P1/B-0501-b0441-slice-5-assignment-history-dedup-cooldown-2026-05-14.md @@ -1,12 +1,12 @@ --- id: B-0501 priority: P1 -status: open +status: closed title: "B-0441 slice 5 — assignment history dedup cooldown (avoid re-assigning same row within short window)" tier: factory-infrastructure effort: S created: 2026-05-14 -last_updated: 2026-05-14 +last_updated: 2026-05-20 parent: B-0441 depends_on: [] composes_with: [B-0441, B-0500, B-0502] @@ -28,27 +28,27 @@ busy. This produces noisy bus output and makes the assignment signal less meanin ## Acceptance criteria -- [ ] `NotifierConfig` gains a `historyFile` field (default +- [x] `NotifierConfig` gains a `historyFile` field (default `"/tmp/zeta-bus/assignment-history.json"`; respects `ZETA_BUS_DIR` if set) and a `cooldownMin` field (default `30`) -- [ ] Before publishing a work-assignment envelope for a given `rowId`, check the +- [x] Before publishing a work-assignment envelope for a given `rowId`, check the history file: - If `rowId` appears in the history with a timestamp within `cooldownMin` minutes of `now()` → skip that row (do not publish) - If absent or expired → publish and record `{ rowId, publishedAt: now().toISOString() }` -- [ ] After publishing, write the updated history back to `historyFile`: +- [x] After publishing, write the updated history back to `historyFile`: - Prune entries older than `cooldownMin` before writing to bound file size - Use atomic write (write to `.tmp` then rename) to survive concurrent access from multiple notifier instances -- [ ] `PollResult` gains a `skippedDueToCooldown: string[]` field listing any `rowId`s +- [x] `PollResult` gains a `skippedDueToCooldown: string[]` field listing any `rowId`s that were skipped because of cooldown -- [ ] Adapters interface gains: +- [x] Adapters interface gains: - `readHistoryFile: (path: string) => AssignmentHistory | null` (returns null when file absent or unreadable) - `writeHistoryFile: (path: string, history: AssignmentHistory) => void` - Tests inject fake implementations; production uses `REAL_ADAPTERS` with `fs.readFileSync` / atomic-rename write -- [ ] Tests added (DST-replayable with injected adapters): +- [x] Tests added (DST-replayable with injected adapters): - Row assigned at T=0; same row at T=15min (within 30min cooldown) → skipped - Row assigned at T=0; same row at T=35min (after 30min cooldown) → re-assigned - History file absent → treated as empty; first assignment proceeds normally @@ -56,6 +56,16 @@ busy. This produces noisy bus output and makes the assignment signal less meanin lists skipped IDs - History pruning: entries older than `cooldownMin` removed on write +## Resolution + +Shipped in this PR. 8 new tests (45 total) cover all acceptance criteria + bonus +coverage of `defaultHistoryFile()` honoring `ZETA_BUS_DIR` + new `--history-file` +and `--cooldown-min` CLI flags. REAL_ADAPTERS uses atomic-rename via +`renameSync(tmp, path)` after `writeFileSync(tmp, ...)`. Default config resolves +the history-file path at module-load time via `defaultHistoryFile()` honoring +`process.env.ZETA_BUS_DIR`. B-0441 parent acceptance criterion "Tracks assignment +history to avoid re-assigning same row within short window" is now satisfied. + ## Design sketch ```typescript diff --git a/tools/bg/backlog-ready-notifier.test.ts b/tools/bg/backlog-ready-notifier.test.ts index b072e6dc9..9d633f111 100644 --- a/tools/bg/backlog-ready-notifier.test.ts +++ b/tools/bg/backlog-ready-notifier.test.ts @@ -1,12 +1,14 @@ import { describe, expect, test } from "bun:test"; import { DEFAULT_CONFIG, + defaultHistoryFile, parseArgs, parseRow, parsePositiveMinutes, pollOnce, runOnce, type Adapters, + type AssignmentHistory, type BacklogRow, } from "./backlog-ready-notifier"; import type { AgentId, MessageEnvelope, SenderAgentId } from "../bus/types"; @@ -19,12 +21,18 @@ type FakeAssignmentCall = { rationale: string; }; +type HistoryStore = { + read: AssignmentHistory | null; + written: AssignmentHistory[]; +}; + function fakeAdapters( nowIso: string, rows: BacklogRow[], capturedCalls: FakeAssignmentCall[] = [], gitLogStr: string = "", ghPrListStr: string = "", + history: HistoryStore = { read: null, written: [] }, ): Adapters { return { now: () => new Date(nowIso), @@ -46,6 +54,11 @@ function fakeAdapters( }, execGitLog: () => gitLogStr, execGhPrList: () => ghPrListStr, + readHistoryFile: () => history.read, + writeHistoryFile: (_path, h) => { + history.written.push(h); + history.read = h; + }, }; } @@ -445,15 +458,278 @@ title: only a title // clean git log and prs const adapters = fakeAdapters("2026-05-13T18:00:00Z", [ROW_OPEN_NO_DEPS], captured, "", ""); const config = { ...DEFAULT_CONFIG, targetAgent: "testagent" }; - + const result = pollOnce(config, adapters); - + expect(result.queueBusy).toBe(false); expect(result.publishedEnvelopeIds).toHaveLength(1); expect(captured).toHaveLength(1); }); }); + describe("assignment-history cooldown (slice 5a)", () => { + test("defaultHistoryFile honors ZETA_BUS_DIR env var when set", () => { + const before = process.env.ZETA_BUS_DIR; + try { + process.env.ZETA_BUS_DIR = "/var/zeta-test"; + expect(defaultHistoryFile()).toBe("/var/zeta-test/assignment-history.json"); + delete process.env.ZETA_BUS_DIR; + expect(defaultHistoryFile()).toBe("/tmp/zeta-bus/assignment-history.json"); + } finally { + if (before === undefined) delete process.env.ZETA_BUS_DIR; + else process.env.ZETA_BUS_DIR = before; + } + }); + + test("row assigned at T=0; same row at T=15min (within 30min cooldown) → skipped", () => { + const captured: FakeAssignmentCall[] = []; + // Pre-populate history with B-9001 published at T=0. + const history: HistoryStore = { + read: { entries: [{ rowId: "B-9001", publishedAt: "2026-05-13T18:00:00.000Z" }] }, + written: [], + }; + // Poll at T+15min. + const adapters = fakeAdapters( + "2026-05-13T18:15:00.000Z", + [ROW_OPEN_NO_DEPS], + captured, + "", + "", + history, + ); + const result = pollOnce({ ...DEFAULT_CONFIG, cooldownMin: 30 }, adapters); + expect(result.skippedDueToCooldown).toEqual(["B-9001"]); + expect(result.publishedEnvelopeIds).toHaveLength(0); + expect(captured).toHaveLength(0); + expect(result.note).toContain("skipped 1 due to cooldown"); + }); + + test("row assigned at T=0; same row at T=35min (after 30min cooldown) → re-assigned", () => { + const captured: FakeAssignmentCall[] = []; + const history: HistoryStore = { + read: { entries: [{ rowId: "B-9001", publishedAt: "2026-05-13T18:00:00.000Z" }] }, + written: [], + }; + // Poll at T+35min — entry is expired (older than 30min cooldown). + const adapters = fakeAdapters( + "2026-05-13T18:35:00.000Z", + [ROW_OPEN_NO_DEPS], + captured, + "", + "", + history, + ); + const result = pollOnce({ ...DEFAULT_CONFIG, cooldownMin: 30 }, adapters); + expect(result.skippedDueToCooldown).toEqual([]); + expect(result.publishedEnvelopeIds).toHaveLength(1); + expect(captured).toHaveLength(1); + // History rewritten: pruned the stale entry, appended fresh entry. + expect(history.written).toHaveLength(1); + expect(history.written[0]!.entries).toEqual([ + { rowId: "B-9001", publishedAt: "2026-05-13T18:35:00.000Z" }, + ]); + }); + + test("history file absent → first assignment proceeds normally and writes history", () => { + const captured: FakeAssignmentCall[] = []; + const history: HistoryStore = { read: null, written: [] }; + const adapters = fakeAdapters( + "2026-05-13T18:00:00.000Z", + [ROW_OPEN_NO_DEPS], + captured, + "", + "", + history, + ); + const result = pollOnce(DEFAULT_CONFIG, adapters); + expect(result.skippedDueToCooldown).toEqual([]); + expect(result.publishedEnvelopeIds).toHaveLength(1); + expect(history.written).toHaveLength(1); + expect(history.written[0]!.entries[0]).toMatchObject({ + rowId: "B-9001", + publishedAt: "2026-05-13T18:00:00.000Z", + }); + }); + + test("multiple rows in cooldown → only expired rows published; skippedDueToCooldown lists skipped IDs", () => { + const captured: FakeAssignmentCall[] = []; + // B-9001 published 15min ago (still in cooldown); B-9002 published 45min ago (expired). + const history: HistoryStore = { + read: { + entries: [ + { rowId: "B-9001", publishedAt: "2026-05-13T18:00:00.000Z" }, + { rowId: "B-9002", publishedAt: "2026-05-13T17:30:00.000Z" }, + ], + }, + written: [], + }; + const rowB9001: BacklogRow = { ...ROW_OPEN_NO_DEPS, id: "B-9001" }; + const rowB9002: BacklogRow = { ...ROW_OPEN_NO_DEPS, id: "B-9002" }; + const adapters = fakeAdapters( + "2026-05-13T18:15:00.000Z", // T+15min from B-9001; T+45min from B-9002 + [rowB9001, rowB9002], + captured, + "", + "", + history, + ); + const result = pollOnce({ ...DEFAULT_CONFIG, maxAssignments: 10, cooldownMin: 30 }, adapters); + expect(result.skippedDueToCooldown).toEqual(["B-9001"]); + expect(result.publishedEnvelopeIds).toHaveLength(1); + expect(captured.map(c => c.rowId)).toEqual(["B-9002"]); + }); + + test("history pruning: entries older than cooldownMin removed on write", () => { + const captured: FakeAssignmentCall[] = []; + // One ancient entry + one fresh entry from a different row that's about to be re-published-fresh. + const history: HistoryStore = { + read: { + entries: [ + { rowId: "B-OLD", publishedAt: "2026-05-13T17:00:00.000Z" }, // 60min old (expired) + { rowId: "B-RECENT", publishedAt: "2026-05-13T17:50:00.000Z" }, // 10min old (kept) + ], + }, + written: [], + }; + const rowNew: BacklogRow = { ...ROW_OPEN_NO_DEPS, id: "B-NEW" }; + const adapters = fakeAdapters( + "2026-05-13T18:00:00.000Z", + [rowNew], + captured, + "", + "", + history, + ); + const result = pollOnce({ ...DEFAULT_CONFIG, cooldownMin: 30 }, adapters); + expect(result.publishedEnvelopeIds).toHaveLength(1); + // Written history should NOT include B-OLD (pruned) but should keep B-RECENT and add B-NEW. + expect(history.written).toHaveLength(1); + const writtenIds = history.written[0]!.entries.map(e => e.rowId); + expect(writtenIds).not.toContain("B-OLD"); + expect(writtenIds).toContain("B-RECENT"); + expect(writtenIds).toContain("B-NEW"); + }); + + test("--history-file and --cooldown-min flags parse correctly", () => { + const config = parseArgs([ + "--history-file", "/custom/path/assignment-history.json", + "--cooldown-min", "60", + ]); + expect(config.historyFile).toBe("/custom/path/assignment-history.json"); + expect(config.cooldownMin).toBe(60); + }); + + test("--history-file rejects missing value", () => { + expect(() => parseArgs(["--history-file"])).toThrow(/requires a value/); + }); + + test("cooled-down rows do NOT consume maxAssignments quota — later eligible rows still publish (Codex P1 #4449)", () => { + const captured: FakeAssignmentCall[] = []; + // First 3 ready rows are in cooldown; 4th and 5th are eligible. + // With maxAssignments=3, all 3 publishes should go to the 4th, 5th, and... wait we need 3 eligible. + // Re-cast: 3 in cooldown + 3 eligible; maxAssignments=3 must publish the 3 eligible. + const history: HistoryStore = { + read: { + entries: [ + { rowId: "B-COOLED-1", publishedAt: "2026-05-13T18:00:00.000Z" }, + { rowId: "B-COOLED-2", publishedAt: "2026-05-13T18:00:00.000Z" }, + { rowId: "B-COOLED-3", publishedAt: "2026-05-13T18:00:00.000Z" }, + ], + }, + written: [], + }; + const rows: BacklogRow[] = [ + { ...ROW_OPEN_NO_DEPS, id: "B-COOLED-1" }, + { ...ROW_OPEN_NO_DEPS, id: "B-COOLED-2" }, + { ...ROW_OPEN_NO_DEPS, id: "B-COOLED-3" }, + { ...ROW_OPEN_NO_DEPS, id: "B-ELIG-1" }, + { ...ROW_OPEN_NO_DEPS, id: "B-ELIG-2" }, + { ...ROW_OPEN_NO_DEPS, id: "B-ELIG-3" }, + ]; + // Poll at T+15min — cooldown 30min still active for COOLED-* rows. + const adapters = fakeAdapters( + "2026-05-13T18:15:00.000Z", + rows, + captured, + "", + "", + history, + ); + const result = pollOnce({ ...DEFAULT_CONFIG, maxAssignments: 3, cooldownMin: 30 }, adapters); + expect(result.publishedEnvelopeIds).toHaveLength(3); + expect(captured.map(c => c.rowId)).toEqual(["B-ELIG-1", "B-ELIG-2", "B-ELIG-3"]); + expect(result.skippedDueToCooldown).toEqual(["B-COOLED-1", "B-COOLED-2", "B-COOLED-3"]); + }); + + test("readHistoryFile NOT called when noPublish: true (Copilot P1 #4449 — defer history IO)", () => { + const captured: FakeAssignmentCall[] = []; + let readCount = 0; + const baseAdapters = fakeAdapters("2026-05-13T18:00:00Z", [ROW_OPEN_NO_DEPS], captured); + const adapters: Adapters = { + ...baseAdapters, + readHistoryFile: (_path) => { + readCount += 1; + return null; + }, + }; + const result = pollOnce({ ...DEFAULT_CONFIG, noPublish: true }, adapters); + expect(result.publishedEnvelopeIds).toHaveLength(0); + expect(readCount).toBe(0); + }); + + test("readHistoryFile NOT called when readyRows is empty (Copilot P1 #4449 — defer history IO)", () => { + const captured: FakeAssignmentCall[] = []; + let readCount = 0; + const baseAdapters = fakeAdapters("2026-05-13T18:00:00Z", [ROW_CLOSED], captured); + const adapters: Adapters = { + ...baseAdapters, + readHistoryFile: (_path) => { + readCount += 1; + return null; + }, + }; + const result = pollOnce(DEFAULT_CONFIG, adapters); + expect(result.readyRowsFound).toBe(0); + expect(readCount).toBe(0); + }); + + test("read-merge-write preserves concurrent peer's history entry (Codex P1 #4449)", () => { + const captured: FakeAssignmentCall[] = []; + // Initial read: empty (our snapshot says no prior history). + // Pre-write read: peer wrote B-PEER between our two reads. + // We're publishing B-OURS. Expect both B-PEER + B-OURS in the final write. + const initialHistory: AssignmentHistory = { entries: [] }; + const peerWroteBetween: AssignmentHistory = { + entries: [{ rowId: "B-PEER", publishedAt: "2026-05-13T17:55:00.000Z" }], + }; + let readIdx = 0; + const writtenHistory: AssignmentHistory[] = []; + const baseAdapters = fakeAdapters( + "2026-05-13T18:00:00.000Z", + [{ ...ROW_OPEN_NO_DEPS, id: "B-OURS" }], + captured, + ); + const adapters: Adapters = { + ...baseAdapters, + readHistoryFile: () => { + // 1st read: initial (empty); 2nd read: just before write (peer added entry) + const result = readIdx === 0 ? initialHistory : peerWroteBetween; + readIdx += 1; + return result; + }, + writeHistoryFile: (_path, h) => { + writtenHistory.push(h); + }, + }; + const result = pollOnce({ ...DEFAULT_CONFIG, cooldownMin: 30 }, adapters); + expect(result.publishedEnvelopeIds).toHaveLength(1); + expect(captured[0]!.rowId).toBe("B-OURS"); + expect(writtenHistory).toHaveLength(1); + const writtenIds = writtenHistory[0]!.entries.map(e => e.rowId).sort(); + expect(writtenIds).toEqual(["B-OURS", "B-PEER"]); + }); + }); + describe("parseArgs", () => { test("default config when no args", () => { expect(parseArgs([])).toEqual(DEFAULT_CONFIG); diff --git a/tools/bg/backlog-ready-notifier.ts b/tools/bg/backlog-ready-notifier.ts index 3c1888a0b..70cc5074d 100644 --- a/tools/bg/backlog-ready-notifier.ts +++ b/tools/bg/backlog-ready-notifier.ts @@ -12,8 +12,8 @@ // Run: bun tools/bg/backlog-ready-notifier.ts [--once] [--poll-min N] [--backlog-dir PATH] [--no-publish] [--agent NAME] [--to NAME] [--max-assignments N] // Compose with: B-0441 + B-0400 (bus) + B-0440 (reactive peer). -import { readdirSync, readFileSync } from "node:fs"; -import { join } from "node:path"; +import { readdirSync, readFileSync, renameSync, writeFileSync, mkdirSync, mkdtempSync, rmSync } from "node:fs"; +import { dirname, join } from "node:path"; import { publish } from "../bus/bus"; import { AGENT_IDS, SENDER_IDS, type AgentId, type MessageEnvelope, type SenderAgentId } from "../bus/types"; @@ -34,8 +34,21 @@ export type NotifierConfig = { maxAssignments: number; /** Agent whose queue state is checked before assignment */ targetAgent: string; + /** + * Path to the assignment-history JSON file (B-0501 slice 5a — cooldown). + * Default resolves to `${ZETA_BUS_DIR ?? "/tmp/zeta-bus"}/assignment-history.json`. + */ + historyFile: string; + /** Cooldown window in minutes — same rowId is not re-assigned within this window. */ + cooldownMin: number; }; +/** Compute the default history-file path honoring ZETA_BUS_DIR if set. */ +export function defaultHistoryFile(): string { + const dir = process.env.ZETA_BUS_DIR ?? "/tmp/zeta-bus"; + return join(dir, "assignment-history.json"); +} + export const DEFAULT_CONFIG: NotifierConfig = { pollIntervalMin: 10, once: false, @@ -45,6 +58,17 @@ export const DEFAULT_CONFIG: NotifierConfig = { toAgent: "*", maxAssignments: 3, targetAgent: "otto", + historyFile: defaultHistoryFile(), + cooldownMin: 30, +}; + +export type AssignmentHistoryEntry = { + rowId: string; + publishedAt: string; // ISO-8601 +}; + +export type AssignmentHistory = { + entries: AssignmentHistoryEntry[]; }; export type BacklogRow = { @@ -64,6 +88,8 @@ export type PollResult = { /** Structured publish-failure reason; null on success or skip. */ lastPublishError: string | null; queueBusy: boolean; + /** Row IDs that were ready-to-publish but skipped due to cooldown (B-0501). */ + skippedDueToCooldown: string[]; note: string; }; @@ -83,6 +109,16 @@ export type Adapters = { execGitLog: (sinceMinutes: number) => string | null; /** Returns `gh pr list` JSON output, or null if the gh invocation fails (treat as indeterminate). */ execGhPrList: () => string | null; + /** + * Returns parsed assignment history, or null when the file is absent / unreadable / malformed. + * Null is treated as "no prior history" (first-assignment behavior). + */ + readHistoryFile: (path: string) => AssignmentHistory | null; + /** + * Persists the assignment history. Production uses atomic-rename to survive concurrent + * notifier instances writing the same file. + */ + writeHistoryFile: (path: string, history: AssignmentHistory) => void; }; // Keys are lowercase to match the canonical bus agent IDs (SENDER_IDS in tools/bus/types.ts). @@ -196,6 +232,68 @@ const REAL_ADAPTERS: Adapters = { if (result.status !== 0 || result.error) return null; return result.stdout ?? ""; }, + readHistoryFile: (path: string) => { + try { + const raw = readFileSync(path, "utf8"); + const parsed = JSON.parse(raw) as unknown; + if (typeof parsed !== "object" || parsed === null || !Array.isArray((parsed as { entries?: unknown }).entries)) { + return null; + } + const entries = (parsed as { entries: unknown[] }).entries.filter( + (e): e is AssignmentHistoryEntry => + typeof e === "object" && e !== null + && typeof (e as { rowId?: unknown }).rowId === "string" + && typeof (e as { publishedAt?: unknown }).publishedAt === "string", + ); + return { entries }; + } catch { + return null; + } + }, + writeHistoryFile: (path: string, history: AssignmentHistory) => { + // Atomic rename via a PRIVATE temp directory. + // + // Pattern: `mkdtempSync` creates a directory with mode 0700 and a + // cryptographically-unguessable suffix in the parent of `path`. We write + // the history into that private directory then rename onto `path`. This + // defeats: + // - Symlink attack: attacker cannot pre-create our temp path because the + // dir name is unguessable AND world-not-accessible (0700 mode). + // - Concurrent-notifier collision: each instance gets its own private + // subdirectory; no shared temp filename. + // - The CodeQL `js/insecure-temporary-file` rule, which flags any + // predictable filename creation in an OS temp dir even when O_EXCL + // would defeat the attack. + // + // Residual race (B-0501 spec atomic-write-note + a PR #4449 review + // finding): the read-modify-write cycle is not flock-serialised, so two + // concurrent instances can each compute history-deltas from the same + // pre-write snapshot and the later rename wins. The B-0501 spec accepts + // this as best-effort noise. The caller in `pollOnce` mitigates further + // by reading history again RIGHT BEFORE write and merging. + try { + mkdirSync(dirname(path), { recursive: true }); + } catch { + // Directory creation failure is recoverable when the dir already exists; + // writeFileSync below will surface unrecoverable errors. + } + const tmpDir = mkdtempSync(join(dirname(path), ".history-tmp-")); + const tmp = join(tmpDir, "assignment-history.json"); + try { + writeFileSync(tmp, JSON.stringify(history), { flag: "wx" }); + renameSync(tmp, path); + } finally { + // Best-effort cleanup of the private temp dir (the file inside was + // renamed out so the dir is usually empty; rmSync handles either case). + try { + rmSync(tmpDir, { recursive: true, force: true }); + } catch { + // Cleanup failure is non-fatal: the temp dir's contents already moved + // to `path` (the load-bearing operation succeeded); leftover empty + // dir is at worst a noise issue. + } + } + }, }; /** @@ -276,16 +374,37 @@ export function pollOnce( publishedEnvelopeIds: [], lastPublishError: null, queueBusy: true, + skippedDueToCooldown: [], note: `queue busy for ${config.targetAgent} — skip publish`, }; } + // B-0501 slice 5a: cooldown gate. History is read lazily inside the publish + // branch so dry-runs (`--no-publish`) and ready-row-empty polls don't pay the + // disk-IO + JSON-parse cost on every cycle. The scan walks readyRows in order + // and applies cooldown PER ROW so cooled-down rows don't consume the + // maxAssignments quota and silently block later eligible rows from publishing. + const cooldownMs = config.cooldownMin * 60_000; const publishedEnvelopeIds: string[] = []; + const skippedDueToCooldown: string[] = []; + const publishedRowIds: string[] = []; let lastPublishError: string | null = null; + if (!config.noPublish && readyRows.length > 0) { - const toAssign = readyRows.slice(0, config.maxAssignments); - for (const row of toAssign) { + const history: AssignmentHistory = adapters.readHistoryFile(config.historyFile) ?? { entries: [] }; + const activeEntries = new Set( + history.entries + .filter(e => pollAt.getTime() - new Date(e.publishedAt).getTime() < cooldownMs) + .map(e => e.rowId), + ); + + for (const row of readyRows) { + if (publishedEnvelopeIds.length >= config.maxAssignments) break; if (!isValidPriority(row.priority)) continue; + if (activeEntries.has(row.id)) { + skippedDueToCooldown.push(row.id); + continue; + } const rationale = `Ready-to-grind: ${row.id} is open with all deps satisfied. Decomposition discipline (PR #2999) says decompose ambiguous parents into concrete slices.`; try { const envelope = adapters.publishAssignment( @@ -296,6 +415,7 @@ export function pollOnce( rationale, ); publishedEnvelopeIds.push(envelope.id); + publishedRowIds.push(row.id); } catch (e) { // Bus publish failure must NOT kill the poll loop. Captured in // lastPublishError (structured + machine-readable per Riven P1). @@ -303,6 +423,42 @@ export function pollOnce( break; // stop the batch on first failure; next tick retries } } + + // Persist updated history: keep only entries within cooldown, append new publishes. + // Pruning bounds file size as the system runs over time. + // + // Read-merge-write under concurrent notifier instances (PR #4449 review + // finding): re-read the on-disk history immediately before computing the + // next write and union any rowIds another instance recorded between our + // initial read and this point. This reduces (but does not strictly + // eliminate) the read-modify-write race the B-0501 spec atomic-write-note + // classified as acceptable noise. Strict elimination would require flock + // or an append-only log, both out of scope for slice 5a. + if (publishedRowIds.length > 0) { + const onDiskNow = adapters.readHistoryFile(config.historyFile) ?? history; + const ourPublishedSet = new Set(publishedRowIds); + const mergedExisting = [ + // Keep our snapshot's entries plus any extras a concurrent writer added. + // Filter both by cooldown window AND skip dupes our own publishes will re-add. + ...history.entries, + ...onDiskNow.entries.filter(e => !history.entries.some(h => h.rowId === e.rowId && h.publishedAt === e.publishedAt)), + ].filter( + e => pollAt.getTime() - new Date(e.publishedAt).getTime() < cooldownMs + && !ourPublishedSet.has(e.rowId), + ); + const newEntries: AssignmentHistoryEntry[] = [ + ...mergedExisting, + ...publishedRowIds.map(id => ({ rowId: id, publishedAt: pollAt.toISOString() })), + ]; + try { + adapters.writeHistoryFile(config.historyFile, { entries: newEntries }); + } catch (e) { + // History-write failure is non-fatal: the publish already happened. + // Capture in lastPublishError so the operator can see something went wrong. + const msg = e instanceof Error ? e.message : String(e); + lastPublishError = lastPublishError ?? `history-write: ${msg}`; + } + } } const danglingNote = danglingDeps.size > 0 @@ -317,6 +473,10 @@ export function pollOnce( ? ` (published ${publishedEnvelopeIds.length} assignment envelope(s))` : ""; + const cooldownNote = skippedDueToCooldown.length > 0 + ? ` (skipped ${skippedDueToCooldown.length} due to cooldown: ${skippedDueToCooldown.slice(0, 3).join(", ")})` + : ""; + return { pollAt: pollAt.toISOString(), totalOpenRows: openRows.length, @@ -325,8 +485,9 @@ export function pollOnce( publishedEnvelopeIds, lastPublishError, queueBusy: false, + skippedDueToCooldown, note: readyRows.length > 0 - ? `${readyRows.length} of ${openRows.length} open rows are ready-to-grind; top candidates: ${readyRows.slice(0, 5).map(r => r.id).join(", ")}${publishNote}${danglingNote}` + ? `${readyRows.length} of ${openRows.length} open rows are ready-to-grind; top candidates: ${readyRows.slice(0, 5).map(r => r.id).join(", ")}${publishNote}${cooldownNote}${danglingNote}` : `${openRows.length} open rows but none ready${danglingNote}`, }; } @@ -387,6 +548,8 @@ const KNOWN_FLAGS = [ "--to", "--max-assignments", "--target-agent", + "--history-file", + "--cooldown-min", ] as const; export function parseArgs(argv: string[]): NotifierConfig { @@ -414,6 +577,12 @@ export function parseArgs(argv: string[]): NotifierConfig { const next = argv[++i]; if (next === undefined) throw new Error("--target-agent requires a value"); config.targetAgent = next; + } else if (arg === "--history-file") { + const next = argv[++i]; + if (next === undefined) throw new Error("--history-file requires a value"); + config.historyFile = next; + } else if (arg === "--cooldown-min") { + config.cooldownMin = parsePositiveMinutes(argv[++i], "--cooldown-min"); } else { throw new Error(`unknown flag: ${arg}; known flags: ${KNOWN_FLAGS.join(", ")}`); }