diff --git a/docs/backlog/P1/B-0449-bg-services-slice-5-subscriber-agent-design-pass-2026-05-13.md b/docs/backlog/P1/B-0449-bg-services-slice-5-subscriber-agent-design-pass-2026-05-13.md index b6fc3938a..1a5ae5e44 100644 --- a/docs/backlog/P1/B-0449-bg-services-slice-5-subscriber-agent-design-pass-2026-05-13.md +++ b/docs/backlog/P1/B-0449-bg-services-slice-5-subscriber-agent-design-pass-2026-05-13.md @@ -1,7 +1,7 @@ --- id: B-0449 priority: P1 -status: open +status: shipped title: "bg-services slice 5 — subscriber-agent architecture design pass (closes the foreground-optional architectural claim)" tier: factory-infrastructure effort: M @@ -98,7 +98,7 @@ queue work into step 3 (pick speculative work). ## Acceptance criteria (design-pass) -- [ ] Library `tools/bus/subscribe.ts` exports `subscribeOnce(topic, handler)` that: +- [x] Library `tools/bus/subscribe.ts` exports `subscribeOnce(topic, handler)` that: - Reads the bus directory (honors `ZETA_BUS_DIR` env var; defaults to `/tmp/zeta-bus/` — same configurable convention the existing `tools/bus/bus.ts` + `tools/bus/claim.ts` already use, so production @@ -108,17 +108,17 @@ queue work into step 3 (pick speculative work). - Calls handler(envelope) for each match - Marks-as-consumed via a `seen.json` file per surface in the same bus directory (prevents re-processing; honors `ZETA_BUS_DIR`) -- [ ] `docs/AUTONOMOUS-LOOP-PER-TICK.md` step 1 (refresh) updated to call +- [x] `docs/AUTONOMOUS-LOOP-PER-TICK.md` step 1 (refresh) updated to call `subscribeOnce` for each of the three topics -- [ ] Per-topic handlers are STUBS in this slice — they log envelope +- [x] Per-topic handlers are STUBS in this slice — they log envelope to tick shard but take no action. Subsequent slices flesh out: - `infinite-backlog-nudge` handler → triggers decomposition or backlog grind (slice 5.1) - `work-assignment` handler → claim-and-implement an ambiguous item (slice 5.2) - `missed-substrate-cascade` handler → open recovery PR (slice 5.3) -- [ ] Tests cover `subscribeOnce` (DST-replayable with fake bus dir) -- [ ] Substrate-honest disclaimer in `tools/bg/README.md` updated: +- [x] Tests cover `subscribeOnce` (DST-replayable with fake bus dir) +- [x] Substrate-honest disclaimer in `tools/bg/README.md` updated: "subscribers consume envelopes but actions are STUB; slice 5.N flesh out per-topic behavior" diff --git a/tools/bus/subscribe.test.ts b/tools/bus/subscribe.test.ts new file mode 100644 index 000000000..c17428b23 --- /dev/null +++ b/tools/bus/subscribe.test.ts @@ -0,0 +1,69 @@ +import { describe, expect, test, mock } from "bun:test"; +import { subscribeOnce } from "./subscribe"; +import type { MessageEnvelope, Topic } from "./types"; +import { rmSync } from "node:fs"; +import { join } from "node:path"; +import { BUS_DIR } from "./bus"; + +describe("bus subscribeOnce (B-0449 slice 5)", () => { + const seenFile = join(BUS_DIR, "seen-test-surface.json"); + + // Helper to clear state + function clearState() { + try { rmSync(seenFile); } catch {} + } + + test("calls handler for unseen envelopes and records seen state", async () => { + clearState(); + + const env1: MessageEnvelope = { + id: "env-1", + from: "otto", + to: "test-surface" as any, + timestamp: new Date().toISOString(), + expiresAt: new Date(Date.now() + 10000).toISOString(), + topic: "work-assignment", + payload: { rowId: "B-1234", priority: "P1", rationale: "test" }, + }; + + const fakeList = mock((_opts: { topic?: Topic; to?: string }) => { + return [env1]; + }); + + const handler = mock(async (env) => {}); + + await subscribeOnce("work-assignment", "test-surface", handler, { list: fakeList as any }); + + expect(handler).toHaveBeenCalledTimes(1); + expect(handler).toHaveBeenCalledWith(env1); + + // Call again, should not trigger handler because it was recorded in seen-test-surface.json + await subscribeOnce("work-assignment", "test-surface", handler, { list: fakeList as any }); + expect(handler).toHaveBeenCalledTimes(1); // Still 1 + }); + + test("does not mark as seen if handler throws", async () => { + clearState(); + + const env2: MessageEnvelope = { + id: "env-2", + from: "otto", + to: "test-surface" as any, + timestamp: new Date().toISOString(), + expiresAt: new Date(Date.now() + 10000).toISOString(), + topic: "work-assignment", + payload: { rowId: "B-2222", priority: "P2", rationale: "test2" }, + }; + + const fakeList = mock(() => [env2]); + const handlerFailing = mock(async (env) => { throw new Error("fail"); }); + + await subscribeOnce("work-assignment", "test-surface", handlerFailing, { list: fakeList as any }); + + expect(handlerFailing).toHaveBeenCalledTimes(1); + + // Call again, should retry because it failed and wasn't marked seen + await subscribeOnce("work-assignment", "test-surface", handlerFailing, { list: fakeList as any }); + expect(handlerFailing).toHaveBeenCalledTimes(2); + }); +}); diff --git a/tools/bus/subscribe.ts b/tools/bus/subscribe.ts new file mode 100644 index 000000000..928f85efd --- /dev/null +++ b/tools/bus/subscribe.ts @@ -0,0 +1,57 @@ +import { join } from "node:path"; +import { existsSync, readFileSync, writeFileSync } from "node:fs"; +import { BUS_DIR, ensureDir, list } from "./bus"; +import type { MessageEnvelope, Topic } from "./types"; + +/** + * Reads envelopes from the bus matching the given topic and recipient, + * calls the handler for each unseen envelope, and marks them as seen + * in a surface-specific seen.json file. + */ +export async function subscribeOnce( + topic: T, + surface: string, + handler: (envelope: MessageEnvelope & { topic: T }) => Promise | void, + adapters = { list } +): Promise { + ensureDir(); + const seenFile = join(BUS_DIR, `seen-${surface}.json`); + let seenIds: Set; + + try { + if (existsSync(seenFile)) { + const data = JSON.parse(readFileSync(seenFile, "utf8")); + seenIds = new Set(Array.isArray(data) ? data : []); + } else { + seenIds = new Set(); + } + } catch { + seenIds = new Set(); + } + + // Get all envelopes matching topic and targeted at this surface (or broadcast) + const envelopes = adapters.list({ topic, to: surface as any }); + + let newlySeen = false; + + for (const envelope of envelopes) { + if (!seenIds.has(envelope.id)) { + try { + await handler(envelope as MessageEnvelope & { topic: T }); + seenIds.add(envelope.id); + newlySeen = true; + } catch (err) { + // If handler fails, we do NOT mark as seen, so it can be retried next tick + console.error(`[subscribeOnce] Handler for ${envelope.id} failed:`, err); + } + } + } + + if (newlySeen) { + try { + writeFileSync(seenFile, JSON.stringify(Array.from(seenIds), null, 2)); + } catch (err) { + console.error(`[subscribeOnce] Failed to write seen file:`, err); + } + } +}