Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
id: B-0449
priority: P1
status: open
status: shipped
title: "bg-services slice 5 — subscriber-agent architecture design pass (closes the foreground-optional architectural claim)"
tier: factory-infrastructure
effort: M
Expand Down Expand Up @@ -98,7 +98,7 @@ queue work into step 3 (pick speculative work).

## Acceptance criteria (design-pass)

- [ ] Library `tools/bus/subscribe.ts` exports `subscribeOnce(topic, handler)` that:
- [x] Library `tools/bus/subscribe.ts` exports `subscribeOnce(topic, handler)` that:
- Reads the bus directory (honors `ZETA_BUS_DIR` env var; defaults to
`/tmp/zeta-bus/` — same configurable convention the existing
`tools/bus/bus.ts` + `tools/bus/claim.ts` already use, so production
Expand All @@ -108,17 +108,17 @@ queue work into step 3 (pick speculative work).
- Calls handler(envelope) for each match
- Marks-as-consumed via a `seen.json` file per surface in the same
bus directory (prevents re-processing; honors `ZETA_BUS_DIR`)
- [ ] `docs/AUTONOMOUS-LOOP-PER-TICK.md` step 1 (refresh) updated to call
- [x] `docs/AUTONOMOUS-LOOP-PER-TICK.md` step 1 (refresh) updated to call
`subscribeOnce` for each of the three topics
- [ ] Per-topic handlers are STUBS in this slice — they log envelope
- [x] Per-topic handlers are STUBS in this slice — they log envelope
Comment on lines +111 to +113
to tick shard but take no action. Subsequent slices flesh out:
- `infinite-backlog-nudge` handler → triggers decomposition or
backlog grind (slice 5.1)
- `work-assignment` handler → claim-and-implement an ambiguous
item (slice 5.2)
- `missed-substrate-cascade` handler → open recovery PR (slice 5.3)
- [ ] Tests cover `subscribeOnce` (DST-replayable with fake bus dir)
- [ ] Substrate-honest disclaimer in `tools/bg/README.md` updated:
- [x] Tests cover `subscribeOnce` (DST-replayable with fake bus dir)
- [x] Substrate-honest disclaimer in `tools/bg/README.md` updated:
"subscribers consume envelopes but actions are STUB; slice 5.N
flesh out per-topic behavior"

Expand Down
69 changes: 69 additions & 0 deletions tools/bus/subscribe.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { describe, expect, test, mock } from "bun:test";
import { subscribeOnce } from "./subscribe";
import type { MessageEnvelope, Topic } from "./types";
import { rmSync } from "node:fs";
import { join } from "node:path";
import { BUS_DIR } from "./bus";

describe("bus subscribeOnce (B-0449 slice 5)", () => {
const seenFile = join(BUS_DIR, "seen-test-surface.json");

// Helper to clear state
function clearState() {
try { rmSync(seenFile); } catch {}
Comment on lines +9 to +13
}

test("calls handler for unseen envelopes and records seen state", async () => {
clearState();

const env1: MessageEnvelope = {
id: "env-1",
from: "otto",
to: "test-surface" as any,
timestamp: new Date().toISOString(),
expiresAt: new Date(Date.now() + 10000).toISOString(),
topic: "work-assignment",
payload: { rowId: "B-1234", priority: "P1", rationale: "test" },
};

const fakeList = mock((_opts: { topic?: Topic; to?: string }) => {
return [env1];
});

const handler = mock(async (env) => {});

await subscribeOnce("work-assignment", "test-surface", handler, { list: fakeList as any });

expect(handler).toHaveBeenCalledTimes(1);
expect(handler).toHaveBeenCalledWith(env1);

// Call again, should not trigger handler because it was recorded in seen-test-surface.json
await subscribeOnce("work-assignment", "test-surface", handler, { list: fakeList as any });
expect(handler).toHaveBeenCalledTimes(1); // Still 1
});

test("does not mark as seen if handler throws", async () => {
clearState();

const env2: MessageEnvelope = {
id: "env-2",
from: "otto",
to: "test-surface" as any,
timestamp: new Date().toISOString(),
expiresAt: new Date(Date.now() + 10000).toISOString(),
topic: "work-assignment",
payload: { rowId: "B-2222", priority: "P2", rationale: "test2" },
};

const fakeList = mock(() => [env2]);
const handlerFailing = mock(async (env) => { throw new Error("fail"); });

await subscribeOnce("work-assignment", "test-surface", handlerFailing, { list: fakeList as any });

expect(handlerFailing).toHaveBeenCalledTimes(1);

// Call again, should retry because it failed and wasn't marked seen
await subscribeOnce("work-assignment", "test-surface", handlerFailing, { list: fakeList as any });
expect(handlerFailing).toHaveBeenCalledTimes(2);
});
});
57 changes: 57 additions & 0 deletions tools/bus/subscribe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { join } from "node:path";
import { existsSync, readFileSync, writeFileSync } from "node:fs";
import { BUS_DIR, ensureDir, list } from "./bus";
import type { MessageEnvelope, Topic } from "./types";

/**
* Reads envelopes from the bus matching the given topic and recipient,
* calls the handler for each unseen envelope, and marks them as seen
* in a surface-specific seen.json file.
*/
export async function subscribeOnce<T extends Topic>(
topic: T,
surface: string,
handler: (envelope: MessageEnvelope & { topic: T }) => Promise<void> | void,
adapters = { list }
): Promise<void> {
ensureDir();
const seenFile = join(BUS_DIR, `seen-${surface}.json`);
let seenIds: Set<string>;

try {
if (existsSync(seenFile)) {
const data = JSON.parse(readFileSync(seenFile, "utf8"));
seenIds = new Set(Array.isArray(data) ? data : []);
} else {
seenIds = new Set();
}
} catch {
seenIds = new Set();
Comment on lines +28 to +29
}

// Get all envelopes matching topic and targeted at this surface (or broadcast)
const envelopes = adapters.list({ topic, to: surface as any });

let newlySeen = false;

for (const envelope of envelopes) {
if (!seenIds.has(envelope.id)) {
try {
await handler(envelope as MessageEnvelope & { topic: T });
seenIds.add(envelope.id);
newlySeen = true;
} catch (err) {
// If handler fails, we do NOT mark as seen, so it can be retried next tick
console.error(`[subscribeOnce] Handler for ${envelope.id} failed:`, err);
}
}
}

if (newlySeen) {
try {
writeFileSync(seenFile, JSON.stringify(Array.from(seenIds), null, 2));

Check failure

Code scanning / CodeQL

Potential file system race condition High

The file may have changed since it
was checked
.
Comment on lines +50 to +52
Comment on lines +50 to +52
} catch (err) {
console.error(`[subscribeOnce] Failed to write seen file:`, err);
}
}
}
Loading