diff --git a/docs/backlog/P1/B-0400-inter-agent-ephemeral-communication-bus-nats-protocol.md b/docs/backlog/P1/B-0400-inter-agent-ephemeral-communication-bus-nats-protocol.md index e92b38e85..016cda249 100644 --- a/docs/backlog/P1/B-0400-inter-agent-ephemeral-communication-bus-nats-protocol.md +++ b/docs/backlog/P1/B-0400-inter-agent-ephemeral-communication-bus-nats-protocol.md @@ -92,8 +92,15 @@ P1 — get as many agents to review as possible within a bounded timeframe. This - `status` subcommand (`bun tools/bus/bus.ts status [--json]`) — dashboard of live heartbeats (latest per agent), raw claim messages, pending review requests, shadow-catch count - 9 new tests (60 total across bus.test.ts + claim.test.ts) -**Deferred to slice 5+:** +**Slice 5 scope (feat/b-0400-slice5-bus-gate-integration):** + +- `allActiveClaims()` function in `claim.ts` — returns all active claims across all items (no itemId required) +- `--with-bus-claims` flag for `poll-pr-gate-batch.ts` — appends active bus claims to batch output +- `BusClaimsFn` injectable type for DST coverage in `main()` +- `pollFn` injection added to `main()` for fully deterministic unit tests +- 3 new tests in `poll-pr-gate-batch.test.ts`, 3 new tests in `claim.test.ts` + +**Deferred to slice 6+:** - NATS JetStream transport swap - Named-pipe transport option -- Integration with `poll-pr-gate-batch.ts` for coordinated claims diff --git a/tools/bus/claim.test.ts b/tools/bus/claim.test.ts index 73f85b892..3bdfb7aed 100644 --- a/tools/bus/claim.test.ts +++ b/tools/bus/claim.test.ts @@ -328,3 +328,57 @@ describe("claim.ts — acquire lock cleanup (P1)", () => { expect(lockFiles).toHaveLength(0); }); }); + +// ── allActiveClaims() ───────────────────────────────────────────────────────── + +describe("claim.ts — allActiveClaims()", () => { + beforeEach(() => { TEST_DIR = mkdtempSync(join(tmpdir(), "zeta-claim-test-")); }); + afterEach(cleanTestDir); + + function evalInBus(code: string): { stdout: string; status: number | null } { + const r = spawnSync("bun", ["-e", code], { + encoding: "utf-8", + env: { ...process.env, ZETA_BUS_DIR: TEST_DIR }, + }); + return { stdout: (r.stdout ?? "").trim(), status: r.status }; + } + + test("returns empty array when bus is empty", () => { + const r = evalInBus(` + const { allActiveClaims } = await import(${JSON.stringify(CLAIM_SCRIPT)}); + console.log(JSON.stringify(allActiveClaims())); + `); + expect(r.status).toBe(0); + expect(JSON.parse(r.stdout)).toEqual([]); + }); + + test("returns all active claims across multiple items", () => { + run("acquire", "--from", "otto", "--item", "B-0400"); + run("acquire", "--from", "vera", "--item", "B-0401"); + + const r = evalInBus(` + const { allActiveClaims } = await import(${JSON.stringify(CLAIM_SCRIPT)}); + console.log(JSON.stringify(allActiveClaims())); + `); + expect(r.status).toBe(0); + const claims = JSON.parse(r.stdout) as Array<{ from: string; itemId: string }>; + expect(claims).toHaveLength(2); + const itemIds = claims.map((c) => c.itemId).sort(); + expect(itemIds).toEqual(["B-0400", "B-0401"]); + }); + + test("released items are not included", () => { + run("acquire", "--from", "otto", "--item", "B-0400"); + run("acquire", "--from", "vera", "--item", "B-0401"); + run("release", "--from", "otto", "--item", "B-0400"); + + const r = evalInBus(` + const { allActiveClaims } = await import(${JSON.stringify(CLAIM_SCRIPT)}); + console.log(JSON.stringify(allActiveClaims())); + `); + expect(r.status).toBe(0); + const claims = JSON.parse(r.stdout) as Array<{ itemId: string }>; + expect(claims).toHaveLength(1); + expect(claims[0]!.itemId).toBe("B-0401"); + }); +}); diff --git a/tools/bus/claim.ts b/tools/bus/claim.ts index 28186a4be..bac14760c 100644 --- a/tools/bus/claim.ts +++ b/tools/bus/claim.ts @@ -145,6 +145,54 @@ export function activeClaims(itemId: string): ClaimRecord[] { return records; } +/** + * Returns all active bus claims across every backlog item. + * Single pass over claim-topic messages — latest action per (from, itemId) wins. + */ +export function allActiveClaims(): ClaimRecord[] { + const msgs = list({ topic: "claim" }); + + type Entry = { envelope: MessageEnvelope; mtime: number; idx: number }; + const byKey = new Map(); + + for (let i = 0; i < msgs.length; i++) { + const m = msgs[i]!; + if (!m.payload || typeof m.payload !== "object" || Array.isArray(m.payload)) continue; + const p = m.payload as { action: string; itemId?: unknown; branch?: string }; + if (typeof p.itemId !== "string") continue; + if (p.action !== "claim" && p.action !== "release") continue; + const key = `${m.from}:${p.itemId}`; + const existing = byKey.get(key); + if (!existing) { + byKey.set(key, { envelope: m, mtime: messageMtimeMs(m.id), idx: i }); + continue; + } + if (m.timestamp > existing.envelope.timestamp) { + byKey.set(key, { envelope: m, mtime: messageMtimeMs(m.id), idx: i }); + } else if (m.timestamp === existing.envelope.timestamp) { + const mm = messageMtimeMs(m.id); + if (mm > existing.mtime || (mm === existing.mtime && i > existing.idx)) { + byKey.set(key, { envelope: m, mtime: mm, idx: i }); + } + } + } + + const records: ClaimRecord[] = []; + for (const { envelope: m } of byKey.values()) { + const p = m.payload as { action: string; itemId: string; branch?: string }; + if (p.action !== "claim") continue; + records.push({ + id: m.id, + from: m.from, + itemId: p.itemId, + ...(p.branch !== undefined && { branch: p.branch }), + timestamp: m.timestamp, + expiresAt: m.expiresAt, + }); + } + return records; +} + // ── CLI ─────────────────────────────────────────────────────────────────────── function parseArgs(argv: string[]): { command: string; flags: Record } { diff --git a/tools/github/poll-pr-gate-batch.test.ts b/tools/github/poll-pr-gate-batch.test.ts index a70be10b4..5f0df6f39 100644 --- a/tools/github/poll-pr-gate-batch.test.ts +++ b/tools/github/poll-pr-gate-batch.test.ts @@ -16,12 +16,16 @@ import { describe, expect, test } from "bun:test"; import { + main, pollAllBounded, summarize, + type BatchReport, type BatchSummary, + type BusClaimsFn, type GateReport, type PollOutcome, } from "./poll-pr-gate-batch"; +import type { ClaimRecord } from "../bus/claim.ts"; // Fixed-shape factory keeps tests terse + deterministic. Every field // has a default; tests override only what they're asserting on. @@ -217,3 +221,96 @@ describe("pollAllBounded with injected pollFn", () => { expect(outcomes[2]?.report?.number).toBe(2); }); }); + +// ── main() — --with-bus-claims (B-0400 slice 5) ────────────────────────────── + +// Capture process.stdout.write and restore after each test. +function captureStdout(): { read: () => string; restore: () => void } { + const chunks: string[] = []; + const orig = process.stdout.write.bind(process.stdout); + // Use a compatible function signature to avoid strict-mode overload mismatch. + (process.stdout as unknown as { write: (s: string) => boolean }).write = (s: string) => { + chunks.push(s); + return true; + }; + return { + read: () => chunks.join(""), + restore: () => { process.stdout.write = orig; }, + }; +} + +const fakeClaim: ClaimRecord = { + id: "test-uuid", + from: "otto", + itemId: "B-0400", + branch: "feat/b-0400-slice5", + timestamp: "2026-05-13T00:00:00.000Z", + expiresAt: "2026-05-14T00:00:00.000Z", +}; + +describe("main() — --with-bus-claims flag", () => { + test("busClaimsFn is called and busClaims field is present when flag is passed", async () => { + let called = false; + const busClaimsFn: BusClaimsFn = () => { called = true; return [fakeClaim]; }; + const pollFn = (pr: number): Promise => + Promise.resolve({ number: pr, report: mkReport({ number: pr }) }); + + const cap = captureStdout(); + let code: number; + try { + code = await main(["--with-bus-claims", "1"], busClaimsFn, pollFn); + } finally { + cap.restore(); + } + + expect(code!).toBe(0); + expect(called).toBe(true); + const batch = JSON.parse(cap.read()) as BatchReport; + expect(Array.isArray(batch.busClaims)).toBe(true); + expect(batch.busClaims).toHaveLength(1); + expect(batch.busClaims![0]!.from).toBe("otto"); + expect(batch.busClaims![0]!.itemId).toBe("B-0400"); + }); + + test("busClaimsFn is NOT called and busClaims is absent when flag is omitted", async () => { + let called = false; + const busClaimsFn: BusClaimsFn = () => { called = true; return [fakeClaim]; }; + const pollFn = (pr: number): Promise => + Promise.resolve({ number: pr, report: mkReport({ number: pr }) }); + + const cap = captureStdout(); + let code: number; + try { + code = await main(["1"], busClaimsFn, pollFn); + } finally { + cap.restore(); + } + + expect(code!).toBe(0); + expect(called).toBe(false); + const batch = JSON.parse(cap.read()) as BatchReport; + expect(batch.busClaims).toBeUndefined(); + }); + + test("busClaims serialized as empty array when --with-bus-claims and busClaimsFn returns nothing", async () => { + // Verifies busClaims: [] appears in the batch output when the bus is empty. + // Note: the --all-open empty-PR early-return path is not exercised here because + // listOpenPRs is not injectable in this test harness; that path requires a + // dedicated integration test. + const busClaimsFn: BusClaimsFn = () => []; + const pollFn = (pr: number): Promise => + Promise.resolve({ number: pr, report: mkReport({ number: pr }) }); + + const cap = captureStdout(); + let code: number; + try { + code = await main(["--with-bus-claims", "2"], busClaimsFn, pollFn); + } finally { + cap.restore(); + } + + expect(code!).toBe(0); + const batch = JSON.parse(cap.read()) as BatchReport; + expect(batch.busClaims).toEqual([]); + }); +}); diff --git a/tools/github/poll-pr-gate-batch.ts b/tools/github/poll-pr-gate-batch.ts index d888327ad..6d891450a 100755 --- a/tools/github/poll-pr-gate-batch.ts +++ b/tools/github/poll-pr-gate-batch.ts @@ -56,6 +56,8 @@ import { spawn, spawnSync } from "node:child_process"; import { fileURLToPath } from "node:url"; import { dirname, resolve } from "node:path"; +import { allActiveClaims } from "../bus/claim.ts"; +import type { ClaimRecord } from "../bus/claim.ts"; export interface CheckCounts { ok: number; @@ -105,6 +107,8 @@ export interface BatchReport { summary: BatchSummary; reports: GateReport[]; errors: PollError[]; + /** Active bus claims across all backlog items — present only when --with-bus-claims */ + busClaims?: ClaimRecord[]; } interface ParsedArgs { @@ -114,6 +118,7 @@ interface ParsedArgs { prs: number[]; allOpen: boolean; summaryOnly: boolean; + withBusClaims: boolean; } const HERE = dirname(fileURLToPath(import.meta.url)); @@ -127,6 +132,7 @@ function parseArgs(argv: string[]): ParsedArgs { prs: [], allOpen: false, summaryOnly: false, + withBusClaims: false, }; const requireValue = (flag: string, v: string | undefined): string => { // Reject any value starting with `-` (not just `--`), so that @@ -158,12 +164,15 @@ function parseArgs(argv: string[]): ParsedArgs { out.allOpen = true; } else if (arg === "--summary-only") { out.summaryOnly = true; + } else if (arg === "--with-bus-claims") { + out.withBusClaims = true; } else if (arg === "--help" || arg === "-h") { process.stdout.write( "Usage: poll-pr-gate-batch.ts ...\n" + " poll-pr-gate-batch.ts --all-open [--owner X] [--repo Y]\n" + " poll-pr-gate-batch.ts --concurrency N \n" + - " poll-pr-gate-batch.ts --summary-only --all-open\n", + " poll-pr-gate-batch.ts --summary-only --all-open\n" + + " poll-pr-gate-batch.ts --with-bus-claims 1234 5678\n", ); process.exit(0); } else if (/^\d+$/.test(arg)) { @@ -314,6 +323,9 @@ type PollFn = ( repo: string, ) => Promise; +/** Injectable bus-claims provider — default reads from /tmp/zeta-bus; override in tests. */ +export type BusClaimsFn = () => ClaimRecord[]; + export async function pollAllBounded( prs: number[], owner: string, @@ -386,7 +398,11 @@ export function summarize(reports: GateReport[]): BatchSummary { return { byGate, byNextAction, byState, actionable, warnings }; } -export async function main(argv: string[]): Promise { +export async function main( + argv: string[], + busClaimsFn: BusClaimsFn = allActiveClaims, + pollFn: PollFn = pollOne, +): Promise { const args = parseArgs(argv); const prs = args.allOpen ? listOpenPRs(args.owner, args.repo) : args.prs; if (prs.length === 0) { @@ -405,11 +421,12 @@ export async function main(argv: string[]): Promise { }, reports: [], errors: [], + ...(args.withBusClaims && { busClaims: busClaimsFn() }), }; process.stdout.write(`${JSON.stringify(empty, null, 2)}\n`); return 0; } - const outcomes = await pollAllBounded(prs, args.owner, args.repo, args.concurrency); + const outcomes = await pollAllBounded(prs, args.owner, args.repo, args.concurrency, pollFn); const reports: GateReport[] = []; const errors: PollError[] = []; for (const o of outcomes) { @@ -424,6 +441,7 @@ export async function main(argv: string[]): Promise { summary: summarize(reports), reports: args.summaryOnly ? [] : reports, errors, + ...(args.withBusClaims && { busClaims: busClaimsFn() }), }; process.stdout.write(`${JSON.stringify(batch, null, 2)}\n`); return errors.length > 0 ? 2 : 0; diff --git a/tools/peer-call/grok.ts b/tools/peer-call/grok.ts index fc4da6fc2..83c795c7a 100644 --- a/tools/peer-call/grok.ts +++ b/tools/peer-call/grok.ts @@ -299,7 +299,7 @@ function buildFullPrompt(args: Args): PromptResult { return { ok: true, value: full }; } -function pickModel(mode: Mode): string { +function pickModel(_mode: Mode): string { // cursor-agent's Grok model lineup shifted 2026-05-13: the old // `grok-4-20-thinking` / `grok-4-20` names are no longer in the // available-models list. The current Grok model is `grok-4.3`