Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,15 @@ P1 — get as many agents to review as possible within a bounded timeframe. This
- `status` subcommand (`bun tools/bus/bus.ts status [--json]`) — dashboard of live heartbeats (latest per agent), raw claim messages, pending review requests, shadow-catch count
- 9 new tests (60 total across bus.test.ts + claim.test.ts)

**Deferred to slice 5+:**
**Slice 5 scope (feat/b-0400-slice5-bus-gate-integration):**

- `allActiveClaims()` function in `claim.ts` — returns all active claims across all items (no itemId required)
- `--with-bus-claims` flag for `poll-pr-gate-batch.ts` — appends active bus claims to batch output
- `BusClaimsFn` injectable type for DST coverage in `main()`
- `pollFn` injection added to `main()` for fully deterministic unit tests
- 3 new tests in `poll-pr-gate-batch.test.ts`, 3 new tests in `claim.test.ts`

**Deferred to slice 6+:**

- NATS JetStream transport swap
- Named-pipe transport option
- Integration with `poll-pr-gate-batch.ts` for coordinated claims
54 changes: 54 additions & 0 deletions tools/bus/claim.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,57 @@ describe("claim.ts — acquire lock cleanup (P1)", () => {
expect(lockFiles).toHaveLength(0);
});
});

// ── allActiveClaims() ─────────────────────────────────────────────────────────

describe("claim.ts — allActiveClaims()", () => {
beforeEach(() => { TEST_DIR = mkdtempSync(join(tmpdir(), "zeta-claim-test-")); });
afterEach(cleanTestDir);

function evalInBus(code: string): { stdout: string; status: number | null } {
const r = spawnSync("bun", ["-e", code], {
encoding: "utf-8",
env: { ...process.env, ZETA_BUS_DIR: TEST_DIR },
});
return { stdout: (r.stdout ?? "").trim(), status: r.status };
}

test("returns empty array when bus is empty", () => {
const r = evalInBus(`
const { allActiveClaims } = await import(${JSON.stringify(CLAIM_SCRIPT)});
console.log(JSON.stringify(allActiveClaims()));
`);
expect(r.status).toBe(0);
expect(JSON.parse(r.stdout)).toEqual([]);
});

test("returns all active claims across multiple items", () => {
run("acquire", "--from", "otto", "--item", "B-0400");
run("acquire", "--from", "vera", "--item", "B-0401");

const r = evalInBus(`
const { allActiveClaims } = await import(${JSON.stringify(CLAIM_SCRIPT)});
console.log(JSON.stringify(allActiveClaims()));
`);
expect(r.status).toBe(0);
const claims = JSON.parse(r.stdout) as Array<{ from: string; itemId: string }>;
expect(claims).toHaveLength(2);
const itemIds = claims.map((c) => c.itemId).sort();
expect(itemIds).toEqual(["B-0400", "B-0401"]);
});

test("released items are not included", () => {
run("acquire", "--from", "otto", "--item", "B-0400");
run("acquire", "--from", "vera", "--item", "B-0401");
run("release", "--from", "otto", "--item", "B-0400");

const r = evalInBus(`
const { allActiveClaims } = await import(${JSON.stringify(CLAIM_SCRIPT)});
console.log(JSON.stringify(allActiveClaims()));
`);
expect(r.status).toBe(0);
const claims = JSON.parse(r.stdout) as Array<{ itemId: string }>;
expect(claims).toHaveLength(1);
expect(claims[0]!.itemId).toBe("B-0401");
});
});
48 changes: 48 additions & 0 deletions tools/bus/claim.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,54 @@ export function activeClaims(itemId: string): ClaimRecord[] {
return records;
}

/**
* Returns all active bus claims across every backlog item.
* Single pass over claim-topic messages — latest action per (from, itemId) wins.
*/
export function allActiveClaims(): ClaimRecord[] {
const msgs = list({ topic: "claim" });

type Entry = { envelope: MessageEnvelope; mtime: number; idx: number };
const byKey = new Map<string, Entry>();

for (let i = 0; i < msgs.length; i++) {
const m = msgs[i]!;
if (!m.payload || typeof m.payload !== "object" || Array.isArray(m.payload)) continue;
const p = m.payload as { action: string; itemId?: unknown; branch?: string };
if (typeof p.itemId !== "string") continue;
if (p.action !== "claim" && p.action !== "release") continue;
const key = `${m.from}:${p.itemId}`;
const existing = byKey.get(key);
if (!existing) {
byKey.set(key, { envelope: m, mtime: messageMtimeMs(m.id), idx: i });
continue;
}
if (m.timestamp > existing.envelope.timestamp) {
byKey.set(key, { envelope: m, mtime: messageMtimeMs(m.id), idx: i });
} else if (m.timestamp === existing.envelope.timestamp) {
const mm = messageMtimeMs(m.id);
if (mm > existing.mtime || (mm === existing.mtime && i > existing.idx)) {
byKey.set(key, { envelope: m, mtime: mm, idx: i });
}
}
}

const records: ClaimRecord[] = [];
for (const { envelope: m } of byKey.values()) {
const p = m.payload as { action: string; itemId: string; branch?: string };
if (p.action !== "claim") continue;
records.push({
id: m.id,
from: m.from,
itemId: p.itemId,
...(p.branch !== undefined && { branch: p.branch }),
timestamp: m.timestamp,
expiresAt: m.expiresAt,
});
}
return records;
}

// ── CLI ───────────────────────────────────────────────────────────────────────

function parseArgs(argv: string[]): { command: string; flags: Record<string, string> } {
Expand Down
97 changes: 97 additions & 0 deletions tools/github/poll-pr-gate-batch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

import { describe, expect, test } from "bun:test";
import {
main,
pollAllBounded,
summarize,
type BatchReport,
type BatchSummary,
type BusClaimsFn,
type GateReport,
type PollOutcome,
} from "./poll-pr-gate-batch";
import type { ClaimRecord } from "../bus/claim.ts";

// Fixed-shape factory keeps tests terse + deterministic. Every field
// has a default; tests override only what they're asserting on.
Expand Down Expand Up @@ -217,3 +221,96 @@ describe("pollAllBounded with injected pollFn", () => {
expect(outcomes[2]?.report?.number).toBe(2);
});
});

// ── main() — --with-bus-claims (B-0400 slice 5) ──────────────────────────────

// Capture process.stdout.write and restore after each test.
function captureStdout(): { read: () => string; restore: () => void } {
const chunks: string[] = [];
const orig = process.stdout.write.bind(process.stdout);
// Use a compatible function signature to avoid strict-mode overload mismatch.
(process.stdout as unknown as { write: (s: string) => boolean }).write = (s: string) => {
chunks.push(s);
return true;
};
return {
read: () => chunks.join(""),
restore: () => { process.stdout.write = orig; },
};
}

const fakeClaim: ClaimRecord = {
id: "test-uuid",
from: "otto",
itemId: "B-0400",
branch: "feat/b-0400-slice5",
timestamp: "2026-05-13T00:00:00.000Z",
expiresAt: "2026-05-14T00:00:00.000Z",
};

describe("main() — --with-bus-claims flag", () => {
test("busClaimsFn is called and busClaims field is present when flag is passed", async () => {
let called = false;
const busClaimsFn: BusClaimsFn = () => { called = true; return [fakeClaim]; };
const pollFn = (pr: number): Promise<PollOutcome> =>
Promise.resolve({ number: pr, report: mkReport({ number: pr }) });

const cap = captureStdout();
let code: number;
try {
code = await main(["--with-bus-claims", "1"], busClaimsFn, pollFn);
} finally {
cap.restore();
}

expect(code!).toBe(0);
expect(called).toBe(true);
const batch = JSON.parse(cap.read()) as BatchReport;
expect(Array.isArray(batch.busClaims)).toBe(true);
expect(batch.busClaims).toHaveLength(1);
expect(batch.busClaims![0]!.from).toBe("otto");
expect(batch.busClaims![0]!.itemId).toBe("B-0400");
});

test("busClaimsFn is NOT called and busClaims is absent when flag is omitted", async () => {
let called = false;
const busClaimsFn: BusClaimsFn = () => { called = true; return [fakeClaim]; };
const pollFn = (pr: number): Promise<PollOutcome> =>
Promise.resolve({ number: pr, report: mkReport({ number: pr }) });

const cap = captureStdout();
let code: number;
try {
code = await main(["1"], busClaimsFn, pollFn);
} finally {
cap.restore();
}

expect(code!).toBe(0);
expect(called).toBe(false);
const batch = JSON.parse(cap.read()) as BatchReport;
expect(batch.busClaims).toBeUndefined();
});

test("busClaims serialized as empty array when --with-bus-claims and busClaimsFn returns nothing", async () => {
// Verifies busClaims: [] appears in the batch output when the bus is empty.
// Note: the --all-open empty-PR early-return path is not exercised here because
// listOpenPRs is not injectable in this test harness; that path requires a
// dedicated integration test.
const busClaimsFn: BusClaimsFn = () => [];
const pollFn = (pr: number): Promise<PollOutcome> =>
Promise.resolve({ number: pr, report: mkReport({ number: pr }) });

const cap = captureStdout();
let code: number;
try {
code = await main(["--with-bus-claims", "2"], busClaimsFn, pollFn);
} finally {
cap.restore();
}

expect(code!).toBe(0);
const batch = JSON.parse(cap.read()) as BatchReport;
expect(batch.busClaims).toEqual([]);
});
});
24 changes: 21 additions & 3 deletions tools/github/poll-pr-gate-batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import { spawn, spawnSync } from "node:child_process";
import { fileURLToPath } from "node:url";
import { dirname, resolve } from "node:path";
import { allActiveClaims } from "../bus/claim.ts";
import type { ClaimRecord } from "../bus/claim.ts";

export interface CheckCounts {
ok: number;
Expand Down Expand Up @@ -105,6 +107,8 @@ export interface BatchReport {
summary: BatchSummary;
reports: GateReport[];
errors: PollError[];
/** Active bus claims across all backlog items — present only when --with-bus-claims */
busClaims?: ClaimRecord[];
}

interface ParsedArgs {
Expand All @@ -114,6 +118,7 @@ interface ParsedArgs {
prs: number[];
allOpen: boolean;
summaryOnly: boolean;
withBusClaims: boolean;
}

const HERE = dirname(fileURLToPath(import.meta.url));
Expand All @@ -127,6 +132,7 @@ function parseArgs(argv: string[]): ParsedArgs {
prs: [],
allOpen: false,
summaryOnly: false,
withBusClaims: false,
};
const requireValue = (flag: string, v: string | undefined): string => {
// Reject any value starting with `-` (not just `--`), so that
Expand Down Expand Up @@ -158,12 +164,15 @@ function parseArgs(argv: string[]): ParsedArgs {
out.allOpen = true;
} else if (arg === "--summary-only") {
out.summaryOnly = true;
} else if (arg === "--with-bus-claims") {
out.withBusClaims = true;
} else if (arg === "--help" || arg === "-h") {
process.stdout.write(
"Usage: poll-pr-gate-batch.ts <PR1> <PR2> ...\n" +
" poll-pr-gate-batch.ts --all-open [--owner X] [--repo Y]\n" +
" poll-pr-gate-batch.ts --concurrency N <PRs...>\n" +
" poll-pr-gate-batch.ts --summary-only --all-open\n",
" poll-pr-gate-batch.ts --summary-only --all-open\n" +
" poll-pr-gate-batch.ts --with-bus-claims 1234 5678\n",
);
process.exit(0);
} else if (/^\d+$/.test(arg)) {
Expand Down Expand Up @@ -314,6 +323,9 @@ type PollFn = (
repo: string,
) => Promise<PollOutcome>;

/** Injectable bus-claims provider — default reads from /tmp/zeta-bus; override in tests. */
export type BusClaimsFn = () => ClaimRecord[];

export async function pollAllBounded(
prs: number[],
owner: string,
Expand Down Expand Up @@ -386,7 +398,11 @@ export function summarize(reports: GateReport[]): BatchSummary {
return { byGate, byNextAction, byState, actionable, warnings };
}

export async function main(argv: string[]): Promise<number> {
export async function main(
argv: string[],
busClaimsFn: BusClaimsFn = allActiveClaims,
pollFn: PollFn = pollOne,
): Promise<number> {
const args = parseArgs(argv);
const prs = args.allOpen ? listOpenPRs(args.owner, args.repo) : args.prs;
if (prs.length === 0) {
Expand All @@ -405,11 +421,12 @@ export async function main(argv: string[]): Promise<number> {
},
reports: [],
errors: [],
...(args.withBusClaims && { busClaims: busClaimsFn() }),
};
process.stdout.write(`${JSON.stringify(empty, null, 2)}\n`);
return 0;
}
const outcomes = await pollAllBounded(prs, args.owner, args.repo, args.concurrency);
const outcomes = await pollAllBounded(prs, args.owner, args.repo, args.concurrency, pollFn);
const reports: GateReport[] = [];
const errors: PollError[] = [];
for (const o of outcomes) {
Expand All @@ -424,6 +441,7 @@ export async function main(argv: string[]): Promise<number> {
summary: summarize(reports),
reports: args.summaryOnly ? [] : reports,
errors,
...(args.withBusClaims && { busClaims: busClaimsFn() }),
};
process.stdout.write(`${JSON.stringify(batch, null, 2)}\n`);
return errors.length > 0 ? 2 : 0;
Expand Down
2 changes: 1 addition & 1 deletion tools/peer-call/grok.ts
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ function buildFullPrompt(args: Args): PromptResult {
return { ok: true, value: full };
}

function pickModel(mode: Mode): string {
function pickModel(_mode: Mode): string {
// cursor-agent's Grok model lineup shifted 2026-05-13: the old
// `grok-4-20-thinking` / `grok-4-20` names are no longer in the
// available-models list. The current Grok model is `grok-4.3`
Expand Down
Loading