diff --git a/tools/bg/missed-substrate-detector.test.ts b/tools/bg/missed-substrate-detector.test.ts index 2fe99a97d..726beee08 100644 --- a/tools/bg/missed-substrate-detector.test.ts +++ b/tools/bg/missed-substrate-detector.test.ts @@ -5,69 +5,95 @@ import { parsePositiveMinutes, pollOnce, type Adapters, + type CascadeFinding, type FetchResult, type MergedPR, } from "./missed-substrate-detector"; +import type { AgentId, MessageEnvelope, SenderAgentId } from "../bus/types"; -function okAdapters(nowIso: string, merged: MergedPR[], truncated = false): Adapters { - return { - now: () => new Date(nowIso), - fetchRecentMergedPRs: () => ({ status: "ok", prs: merged, truncated }), - }; -} +type FakeCascadeCall = { + from: SenderAgentId; + to: AgentId; + finding: CascadeFinding; +}; -function errorAdapters(nowIso: string, reason: string): Adapters { +function adapters(opts: { + nowIso: string; + fetch: FetchResult; + detectCascade?: (pr: MergedPR) => CascadeFinding | null; + capturedPublishes?: FakeCascadeCall[]; + publishImpl?: (from: SenderAgentId, to: AgentId, finding: CascadeFinding) => MessageEnvelope; +}): Adapters { + const captured = opts.capturedPublishes ?? []; return { - now: () => new Date(nowIso), - fetchRecentMergedPRs: (): FetchResult => ({ status: "gh-error", reason }), + now: () => new Date(opts.nowIso), + fetchRecentMergedPRs: () => opts.fetch, + detectCascade: opts.detectCascade ?? (() => null), + publishCascade: opts.publishImpl ?? ((from, to, finding): MessageEnvelope => { + captured.push({ from, to, finding }); + return { + id: `env-${captured.length}`, + from, + to, + timestamp: opts.nowIso, + expiresAt: opts.nowIso, + topic: "missed-substrate-cascade", + payload: { + prNumber: finding.prNumber, + branchName: finding.branchName, + missingCommits: finding.missingCommits, + recommendedAction: "open-recovery-PR", + urgency: finding.urgency, + }, + }; + }), }; } -describe("missed-substrate-detector slice 2", () => { - test("default config has sensible thresholds", () => { +describe("missed-substrate-detector slice 4 (bus publish wiring; slice-3 detect is stub)", () => { + test("default config", () => { expect(DEFAULT_CONFIG.pollIntervalMin).toBe(5); expect(DEFAULT_CONFIG.lookbackMin).toBe(30); expect(DEFAULT_CONFIG.fetchLimit).toBe(100); - expect(DEFAULT_CONFIG.once).toBe(false); + expect(DEFAULT_CONFIG.noPublish).toBe(false); + expect(DEFAULT_CONFIG.fromAgent).toBe("otto"); + expect(DEFAULT_CONFIG.toAgent).toBe("*"); }); - describe("pollOnce with injected adapters", () => { - test("reports 0 candidates when no merged PRs", () => { - const result = pollOnce(DEFAULT_CONFIG, okAdapters("2026-05-13T18:00:00Z", [])); + describe("pollOnce — fetch path", () => { + test("0 candidates when no merged PRs", () => { + const result = pollOnce(DEFAULT_CONFIG, adapters({ + nowIso: "2026-05-13T18:00:00Z", + fetch: { status: "ok", prs: [], truncated: false }, + })); expect(result.candidatesScanned).toBe(0); expect(result.cascadesDetected).toBe(0); - expect(result.fetchStatus).toBe("ok"); + expect(result.publishedEnvelopeIds).toHaveLength(0); expect(result.note).toContain("no merged PRs"); }); - test("reports candidate count when merged PRs found", () => { + test("scans merged PRs but stub detector finds no cascades", () => { const merged: MergedPR[] = [ { number: 2997, headRefName: "feat/x", mergedAt: "2026-05-13T17:50:00Z" }, { number: 2998, headRefName: "feat/y", mergedAt: "2026-05-13T17:55:00Z" }, ]; - const result = pollOnce(DEFAULT_CONFIG, okAdapters("2026-05-13T18:00:00Z", merged)); + const result = pollOnce(DEFAULT_CONFIG, adapters({ + nowIso: "2026-05-13T18:00:00Z", + fetch: { status: "ok", prs: merged, truncated: false }, + })); expect(result.candidatesScanned).toBe(2); expect(result.cascadesDetected).toBe(0); - expect(result.fetchStatus).toBe("ok"); - expect(result.fetchTruncated).toBe(false); - expect(result.note).toContain("2 merged PR(s)"); - expect(result.note).toContain("slice 3 will compare"); - }); - - test("emits valid ISO timestamp", () => { - const result = pollOnce(DEFAULT_CONFIG, okAdapters("2026-05-13T18:00:00Z", [])); - expect(result.pollAt).toBe("2026-05-13T18:00:00.000Z"); + expect(result.note).toContain("no cascades detected"); + expect(result.note).toContain("slice 3 plugs in real compare logic"); }); - test("surfaces gh-error explicitly (does NOT silently treat as zero PRs)", () => { - const result = pollOnce( - DEFAULT_CONFIG, - errorAdapters("2026-05-13T18:00:00Z", "gh exited with status 1; stderr: HTTP 503"), - ); + test("gh-error surfaces explicitly (does NOT silently treat as zero)", () => { + const result = pollOnce(DEFAULT_CONFIG, adapters({ + nowIso: "2026-05-13T18:00:00Z", + fetch: { status: "gh-error", reason: "HTTP 503" }, + })); expect(result.fetchStatus).toBe("gh-error"); - expect(result.candidatesScanned).toBe(0); expect(result.note).toContain("gh fetch failed"); - expect(result.note).toContain("HTTP 503"); }); test("flags truncation warning when results hit fetchLimit", () => { @@ -76,52 +102,102 @@ describe("missed-substrate-detector slice 2", () => { headRefName: `feat/${i}`, mergedAt: "2026-05-13T17:50:00Z", })); - const result = pollOnce(DEFAULT_CONFIG, okAdapters("2026-05-13T18:00:00Z", merged, true)); + const result = pollOnce(DEFAULT_CONFIG, adapters({ + nowIso: "2026-05-13T18:00:00Z", + fetch: { status: "ok", prs: merged, truncated: true }, + })); expect(result.fetchTruncated).toBe(true); expect(result.note).toContain("WARNING: results truncated"); - expect(result.note).toContain("fetchLimit=100"); }); }); - describe("parsePositiveMinutes", () => { - test("accepts positive finite numbers", () => { - expect(parsePositiveMinutes("30", "--lookback-min")).toBe(30); - }); - - test("rejects invalid inputs", () => { - expect(() => parsePositiveMinutes(undefined, "--lookback-min")).toThrow(/requires/); - expect(() => parsePositiveMinutes("0", "--lookback-min")).toThrow(/positive finite/); - expect(() => parsePositiveMinutes("Infinity", "--lookback-min")).toThrow(/positive finite/); + describe("pollOnce — cascade detection + bus publish (slice 4)", () => { + test("publishes envelope when injected detector finds a cascade", () => { + const captured: FakeCascadeCall[] = []; + const cascade: CascadeFinding = { + prNumber: 2980, + branchName: "feat/launch-thread", + missingCommits: ["abc123", "def456"], + urgency: "medium", + }; + const result = pollOnce(DEFAULT_CONFIG, adapters({ + nowIso: "2026-05-13T18:00:00Z", + fetch: { status: "ok", prs: [{ number: 2980, headRefName: "feat/launch-thread", mergedAt: "2026-05-13T17:55:00Z" }], truncated: false }, + detectCascade: () => cascade, + capturedPublishes: captured, + })); + expect(result.cascadesDetected).toBe(1); + expect(result.publishedEnvelopeIds).toHaveLength(1); + expect(captured).toHaveLength(1); + expect(captured[0]!.finding.prNumber).toBe(2980); + expect(captured[0]!.finding.missingCommits).toEqual(["abc123", "def456"]); + expect(captured[0]!.finding.urgency).toBe("medium"); + expect(result.note).toContain("cascade(s) detected"); }); - }); - describe("parseArgs", () => { - test("default config when no args", () => { - expect(parseArgs([])).toEqual(DEFAULT_CONFIG); + test("does NOT publish when noPublish=true", () => { + const captured: FakeCascadeCall[] = []; + const cascade: CascadeFinding = { + prNumber: 2980, + branchName: "feat/x", + missingCommits: ["sha1"], + urgency: "high", + }; + const result = pollOnce( + { ...DEFAULT_CONFIG, noPublish: true }, + adapters({ + nowIso: "2026-05-13T18:00:00Z", + fetch: { status: "ok", prs: [{ number: 2980, headRefName: "feat/x", mergedAt: "2026-05-13T17:55:00Z" }], truncated: false }, + detectCascade: () => cascade, + capturedPublishes: captured, + }), + ); + expect(result.cascadesDetected).toBe(1); + expect(result.publishedEnvelopeIds).toHaveLength(0); + expect(captured).toHaveLength(0); + expect(result.note).toContain("publish skipped"); }); - test("--once flag", () => { - expect(parseArgs(["--once"]).once).toBe(true); + test("publish failure surfaces in note + does NOT crash poll loop", () => { + const cascade: CascadeFinding = { + prNumber: 2980, + branchName: "feat/x", + missingCommits: ["sha1"], + urgency: "low", + }; + const result = pollOnce(DEFAULT_CONFIG, adapters({ + nowIso: "2026-05-13T18:00:00Z", + fetch: { status: "ok", prs: [{ number: 2980, headRefName: "feat/x", mergedAt: "2026-05-13T17:55:00Z" }], truncated: false }, + detectCascade: () => cascade, + publishImpl: () => { throw new Error("bus IO failure"); }, + })); + expect(result.cascadesDetected).toBe(1); + expect(result.publishedEnvelopeIds).toHaveLength(0); + expect(result.note).toContain("publish failed"); }); + }); - test("--poll-min + --lookback-min + --fetch-limit set values", () => { - const config = parseArgs([ - "--poll-min", "10", - "--lookback-min", "60", - "--fetch-limit", "200", - ]); - expect(config.pollIntervalMin).toBe(10); - expect(config.lookbackMin).toBe(60); - expect(config.fetchLimit).toBe(200); + describe("parsePositiveMinutes", () => { + test("accepts + rejects", () => { + expect(parsePositiveMinutes("30", "--lookback-min")).toBe(30); + expect(() => parsePositiveMinutes("0", "--lookback-min")).toThrow(/positive finite/); }); + }); - test("--fetch-limit rejects non-integer values", () => { - expect(() => parseArgs(["--fetch-limit", "abc"])).toThrow(/positive integer/); - expect(() => parseArgs(["--fetch-limit", "1.5"])).toThrow(/positive integer/); + describe("parseArgs", () => { + test("defaults + flags", () => { + expect(parseArgs([])).toEqual(DEFAULT_CONFIG); + const c = parseArgs(["--once", "--no-publish", "--agent", "vera", "--to", "lior", "--fetch-limit", "50"]); + expect(c.once).toBe(true); + expect(c.noPublish).toBe(true); + expect(c.fromAgent).toBe("vera"); + expect(c.toAgent).toBe("lior"); + expect(c.fetchLimit).toBe(50); }); - test("rejects unknown flags", () => { + test("rejects unknown flags + invalid agent", () => { expect(() => parseArgs(["--unknown"])).toThrow(/unknown flag/); + expect(() => parseArgs(["--agent", "*"])).toThrow(/must be one of/); }); }); }); diff --git a/tools/bg/missed-substrate-detector.ts b/tools/bg/missed-substrate-detector.ts index add3fed83..b7aa86394 100644 --- a/tools/bg/missed-substrate-detector.ts +++ b/tools/bg/missed-substrate-detector.ts @@ -1,33 +1,31 @@ -// missed-substrate-detector.ts — B-0442 slice 2: merged-PR fetch via gh CLI +// missed-substrate-detector.ts — B-0442 slice 4: bus publish on cascade detection // // Background service that detects branch-vs-merged-PR drift: commits landing -// on a feature branch AFTER its parent PR squash-merged. Slice 2 fetches the -// recent merged-PR set (number, head-ref, merged-at) within a configurable -// lookback window via `gh pr list --state merged`. The fetch produces a -// FetchResult that distinguishes successful fetch from gh-failure so the -// caller can tell "no merged PRs" from "gh unavailable". +// on a feature branch AFTER its parent PR squash-merged. Slice 4 closes the +// drift-prevention reactive loop: fetch recent merged PRs, compare branch +// HEAD against squash content (NOTE: slice 3 implements the actual compare; +// slice 4 publishes envelopes for each detected cascade). // -// Slice 2 does NOT yet perform branch-HEAD comparison (slice 3) or bus -// publish (slice 4); it just produces the candidate merged-PR set and -// surfaces gh-failure explicitly. +// Slice 4 ships the bus-publish wiring with a stub comparator that ALWAYS +// reports "no cascade detected" — slice 3 will plug in the real branch-vs- +// squash compare logic. The bus envelope schema + flags + tests are +// production-ready; the comparator stays no-op until slice 3. // -// Run: bun tools/bg/missed-substrate-detector.ts [--once] [--poll-min N] [--lookback-min N] [--fetch-limit N] +// Run: bun tools/bg/missed-substrate-detector.ts [--once] [--poll-min N] [--lookback-min N] [--fetch-limit N] [--no-publish] [--agent NAME] [--to NAME] // Compose with: B-0442 + B-0400 (bus) + B-0440 / B-0441 (companion services). import { spawnSync } from "node:child_process"; +import { publish } from "../bus/bus"; +import { AGENT_IDS, SENDER_IDS, type AgentId, type MessageEnvelope, type SenderAgentId } from "../bus/types"; export type DetectorConfig = { - /** How often to poll, in minutes */ pollIntervalMin: number; - /** Lookback window for merged PRs, in minutes */ lookbackMin: number; - /** - * Max merged PRs to fetch per poll. Defaults to 100 (gh's hard cap is - * higher). Set higher for busy repos with short lookback windows. - */ fetchLimit: number; - /** When true, run a single poll and exit */ once: boolean; + noPublish: boolean; + fromAgent: SenderAgentId; + toAgent: AgentId; }; export const DEFAULT_CONFIG: DetectorConfig = { @@ -35,36 +33,53 @@ export const DEFAULT_CONFIG: DetectorConfig = { lookbackMin: 30, fetchLimit: 100, once: false, + noPublish: false, + fromAgent: "otto", + toAgent: "*", }; export type MergedPR = { number: number; - headRefName: string; // feature branch name - mergedAt: string; // ISO-8601 + headRefName: string; + mergedAt: string; +}; + +export type CascadeFinding = { + prNumber: number; + branchName: string; + missingCommits: string[]; // commits on branch but not in squash + urgency: "low" | "medium" | "high"; }; -/** - * Result of fetching the recent merged-PR window. Distinguishes successful - * fetch (status: "ok") from gh-failure (status: "gh-error") so the caller - * can avoid silently treating gh-down as "no PRs merged". - */ export type FetchResult = | { status: "ok"; prs: MergedPR[]; truncated: boolean } | { status: "gh-error"; reason: string }; export type PollResult = { - pollAt: string; // ISO-8601 + pollAt: string; candidatesScanned: number; cascadesDetected: number; fetchStatus: "ok" | "gh-error"; fetchTruncated: boolean; + publishedEnvelopeIds: string[]; note: string; }; -/** Adapter abstraction so tests can inject deterministic clock + gh result. */ +/** Adapter abstraction so tests can inject deterministic clock + gh + cascade-detector + bus. */ export type Adapters = { now: () => Date; fetchRecentMergedPRs: (lookbackMin: number, fetchLimit: number) => FetchResult; + /** + * Detect cascades on a single merged PR. Slice 3 replaces this stub with + * real branch-HEAD vs squash-content compare logic. Slice 4 keeps it as + * an injectable stub for testing the bus-publish path. + */ + detectCascade: (pr: MergedPR) => CascadeFinding | null; + publishCascade: ( + from: SenderAgentId, + to: AgentId, + finding: CascadeFinding, + ) => MessageEnvelope; }; const REAL_ADAPTERS: Adapters = { @@ -75,16 +90,11 @@ const REAL_ADAPTERS: Adapters = { const result = spawnSync( "gh", [ - "pr", - "list", - "--state", - "merged", - "--search", - `merged:>${since}`, - "--json", - "number,headRefName,mergedAt", - "--limit", - String(fetchLimit), + "pr", "list", + "--state", "merged", + "--search", `merged:>${since}`, + "--json", "number,headRefName,mergedAt", + "--limit", String(fetchLimit), ], { encoding: "utf8", stdio: ["ignore", "pipe", "pipe"] }, ); @@ -92,7 +102,7 @@ const REAL_ADAPTERS: Adapters = { return { status: "gh-error", reason: `gh exited with status ${result.status}; stderr: ${(result.stderr ?? "").toString().slice(0, 200)}` }; } if (!result.stdout) { - return { status: "gh-error", reason: "gh produced empty stdout (no error code; likely IO failure)" }; + return { status: "gh-error", reason: "gh produced empty stdout" }; } try { const parsed = JSON.parse(result.stdout); @@ -111,13 +121,26 @@ const REAL_ADAPTERS: Adapters = { return { status: "gh-error", reason: `JSON parse failed: ${e instanceof Error ? e.message : String(e)}` }; } }, + // Slice 3 will replace this stub with real branch-vs-squash compare. + detectCascade: (_pr: MergedPR) => null, + publishCascade: (from, to, finding) => + publish(from, to, { + topic: "missed-substrate-cascade", + payload: { + prNumber: finding.prNumber, + branchName: finding.branchName, + missingCommits: finding.missingCommits, + recommendedAction: "open-recovery-PR", + urgency: finding.urgency, + }, + }), }; /** - * Single poll iteration. Fetches recent merged PRs and reports the - * candidate set. Surfaces gh-failure explicitly so it can't be silently - * treated as "no PRs merged". Slice 3 will fetch each branch's HEAD and - * compare against squash content. + * Single poll iteration. Fetches merged PRs, runs the cascade detector + * on each, and publishes a missed-substrate-cascade envelope per finding + * (unless noPublish). Slice 4's detectCascade is a stub (returns null + * for all PRs); slice 3 plugs in real logic. */ export function pollOnce( config: DetectorConfig, @@ -133,37 +156,63 @@ export function pollOnce( cascadesDetected: 0, fetchStatus: "gh-error", fetchTruncated: false, + publishedEnvelopeIds: [], note: `gh fetch failed; cannot evaluate drift this tick. ${fetch.reason}`, }; } + const findings: CascadeFinding[] = []; + for (const pr of fetch.prs) { + const finding = adapters.detectCascade(pr); + if (finding !== null) findings.push(finding); + } + + const publishedEnvelopeIds: string[] = []; + let publishError: string | null = null; + if (!config.noPublish && findings.length > 0) { + for (const finding of findings) { + try { + const envelope = adapters.publishCascade(config.fromAgent, config.toAgent, finding); + publishedEnvelopeIds.push(envelope.id); + } catch (e) { + publishError = e instanceof Error ? e.message : String(e); + break; + } + } + } + const truncatedSuffix = fetch.truncated ? ` (WARNING: results truncated at fetchLimit=${config.fetchLimit}; raise --fetch-limit or shorten --lookback-min)` : ""; + const publishSuffix = publishError + ? ` (publish failed: ${publishError})` + : config.noPublish && findings.length > 0 + ? ` (publish skipped per --no-publish)` + : publishedEnvelopeIds.length > 0 + ? ` (published ${publishedEnvelopeIds.length} cascade envelope(s))` + : ""; return { pollAt: pollAt.toISOString(), candidatesScanned: fetch.prs.length, - cascadesDetected: 0, // slice 3 will populate + cascadesDetected: findings.length, fetchStatus: "ok", fetchTruncated: fetch.truncated, - note: fetch.prs.length === 0 + publishedEnvelopeIds, + note: findings.length > 0 + ? `${findings.length} cascade(s) detected in ${fetch.prs.length} merged PR(s)${publishSuffix}${truncatedSuffix}` + : fetch.prs.length === 0 ? `no merged PRs in last ${config.lookbackMin}min lookback window` - : `${fetch.prs.length} merged PR(s) in last ${config.lookbackMin}min; slice 3 will compare each branch HEAD against squash content${truncatedSuffix}`, + : `${fetch.prs.length} merged PR(s) in last ${config.lookbackMin}min; no cascades detected (slice 3 plugs in real compare logic)${truncatedSuffix}`, }; } -/** Run a single poll iteration and return its result. */ export function runOnce(config: DetectorConfig = DEFAULT_CONFIG): PollResult { const result = pollOnce(config); console.log(JSON.stringify(result)); return result; } -/** - * Run the detector as a daemon. Sleeps for pollIntervalMin between - * iterations and never returns; results are NOT accumulated. - */ export async function runDaemon(config: DetectorConfig = DEFAULT_CONFIG): Promise { while (true) { runOnce(config); @@ -189,7 +238,19 @@ function parsePositiveInt(raw: string | undefined, name: string): number { return n; } -const KNOWN_FLAGS = ["--once", "--poll-min", "--lookback-min", "--fetch-limit"] as const; +function parseSenderId(raw: string | undefined): SenderAgentId { + if (raw === undefined) throw new Error("--agent requires a value"); + if ((SENDER_IDS as readonly string[]).includes(raw)) return raw as SenderAgentId; + throw new Error(`--agent must be one of ${SENDER_IDS.join(", ")}; got "${raw}"`); +} + +function parseAgentId(raw: string | undefined): AgentId { + if (raw === undefined) throw new Error("--to requires a value"); + if ((AGENT_IDS as readonly string[]).includes(raw)) return raw as AgentId; + throw new Error(`--to must be one of ${AGENT_IDS.join(", ")}; got "${raw}"`); +} + +const KNOWN_FLAGS = ["--once", "--poll-min", "--lookback-min", "--fetch-limit", "--no-publish", "--agent", "--to"] as const; export function parseArgs(argv: string[]): DetectorConfig { const config: DetectorConfig = { ...DEFAULT_CONFIG }; @@ -198,12 +259,18 @@ export function parseArgs(argv: string[]): DetectorConfig { const arg = argv[i]!; if (arg === "--once") { config.once = true; + } else if (arg === "--no-publish") { + config.noPublish = true; } else if (arg === "--poll-min") { config.pollIntervalMin = parsePositiveMinutes(argv[++i], "--poll-min"); } else if (arg === "--lookback-min") { config.lookbackMin = parsePositiveMinutes(argv[++i], "--lookback-min"); } else if (arg === "--fetch-limit") { config.fetchLimit = parsePositiveInt(argv[++i], "--fetch-limit"); + } else if (arg === "--agent") { + config.fromAgent = parseSenderId(argv[++i]); + } else if (arg === "--to") { + config.toAgent = parseAgentId(argv[++i]); } else { throw new Error(`unknown flag: ${arg}; known flags: ${KNOWN_FLAGS.join(", ")}`); }